This guide explores advanced MCP protocol features that go beyond basic tool and resource handling. Understanding these features helps you build more robust, user-friendly, and production-ready MCP servers.
1. Progress Notifications - Report progress for long-running operations
2. Request Cancellation - Allow clients to cancel in-flight requests
3. Resource Templates - Dynamic resource URIs with parameters
4. Server Lifecycle Events - Proper initialization and shutdown
5. Logging Control - Server-side logging configuration
6. Error Handling Patterns - Consistent error responses
---
For operations that take time (data processing, file downloads, API calls), progress notifications keep users informed.
sequenceDiagram
participant Client
participant Server
Client->>Server: tools/call (long operation)
Server-->>Client: notification: progress 10%
Server-->>Client: notification: progress 50%
Server-->>Client: notification: progress 90%
Server->>Client: result (complete)
from mcp.server import Server, NotificationOptions
from mcp.types import ProgressNotification
import asyncio
app = Server("progress-server")
@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
"""Process a large file with progress updates."""
# Get file size for progress calculation
file_size = os.path.getsize(file_path)
processed = 0
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
# Process chunk
await process_chunk(chunk)
processed += len(chunk)
# Send progress notification
progress = (processed / file_size) * 100
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=progress,
total=100,
message=f"Processing: {progress:.1f}%"
)
)
return f"Processed {file_size} bytes"
@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
"""Process multiple items with progress."""
results = []
total = len(items)
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# Report progress after each item
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=i + 1,
total=total,
message=f"Processed {i + 1}/{total}: {item}"
)
)
return f"Completed {total} items"
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
server.setRequestHandler(CallToolSchema, async (request, extra) => {
const { name, arguments: args } = request.params;
if (name === "process_data") {
const items = args.items as string[];
const results = [];
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i]);
results.push(result);
// Send progress notification
await extra.sendNotification({
method: "notifications/progress",
params: {
progressToken: request.id,
progress: i + 1,
total: items.length,
message: `Processing item ${i + 1}/${items.length}`
}
});
}
return { content: [{ type: "text", text: JSON.stringify(results) }] };
}
});
async def handle_progress(notification):
"""Handle progress notifications from server."""
params = notification.params
print(f"Progress: {params.progress}/{params.total} - {params.message}")
# Register handler
session.on_notification("notifications/progress", handle_progress)
# Call tool (progress updates will arrive via handler)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})
---
Allow clients to cancel requests that are no longer needed or taking too long.
from mcp.server import Server
from mcp.types import CancelledError
import asyncio
app = Server("cancellable-server")
@app.tool()
async def long_running_search(query: str, ctx) -> str:
"""Search that can be cancelled."""
results = []
try:
for page in range(100): # Search through many pages
# Check if cancellation was requested
if ctx.is_cancelled:
raise CancelledError("Search cancelled by user")
# Simulate page search
page_results = await search_page(query, page)
results.extend(page_results)
# Small delay allows cancellation checks
await asyncio.sleep(0.1)
except CancelledError:
# Return partial results
return f"Cancelled. Found {len(results)} results before cancellation."
return f"Found {len(results)} total results"
@app.tool()
async def download_file(url: str, ctx) -> str:
"""Download with cancellation support."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunks = []
async for chunk in response.content.iter_chunked(8192):
if ctx.is_cancelled:
return f"Download cancelled at {downloaded}/{total_size} bytes"
chunks.append(chunk)
downloaded += len(chunk)
return f"Downloaded {downloaded} bytes"
class CancellableContext:
"""Context object that tracks cancellation state."""
def __init__(self, request_id: str):
self.request_id = request_id
self._cancelled = asyncio.Event()
self._cancel_reason = None
@property
def is_cancelled(self) -> bool:
return self._cancelled.is_set()
def cancel(self, reason: str = "Cancelled"):
self._cancel_reason = reason
self._cancelled.set()
async def check_cancelled(self):
"""Raise if cancelled, otherwise continue."""
if self.is_cancelled:
raise CancelledError(self._cancel_reason)
async def sleep_or_cancel(self, seconds: float):
"""Sleep that can be interrupted by cancellation."""
try:
await asyncio.wait_for(
self._cancelled.wait(),
timeout=seconds
)
raise CancelledError(self._cancel_reason)
except asyncio.TimeoutError:
pass # Normal timeout, continue
import asyncio
async def search_with_timeout(session, query, timeout=30):
"""Search with automatic cancellation on timeout."""
task = asyncio.create_task(
session.call_tool("long_running_search", {"query": query})
)
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
# Request cancellation
await session.send_notification({
"method": "notifications/cancelled",
"params": {"requestId": task.request_id, "reason": "Timeout"}
})
return "Search timed out"
---
Resource templates allow dynamic URI construction with parameters, useful for APIs and databases.
from mcp.server import Server
from mcp.types import ResourceTemplate
app = Server("template-server")
@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
"""Return available resource templates."""
return [
ResourceTemplate(
uriTemplate="db://users/{user_id}",
name="User Profile",
description="Fetch user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="api://weather/{city}/{date}",
name="Weather Data",
description="Historical weather for city and date",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="file://{path}",
name="File Content",
description="Read file at given path",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource, expanding template parameters."""
# Parse the URI to extract parameters
if uri.startswith("db://users/"):
user_id = uri.split("/")[-1]
return await fetch_user(user_id)
elif uri.startswith("api://weather/"):
parts = uri.replace("api://weather/", "").split("/")
city, date = parts[0], parts[1]
return await fetch_weather(city, date)
elif uri.startswith("file://"):
path = uri.replace("file://", "")
return await read_file(path)
raise ValueError(f"Unknown resource URI: {uri}")
server.setRequestHandler(ListResourceTemplatesSchema, async () => {
return {
resourceTemplates: [
{
uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",
name: "GitHub Issue",
description: "Fetch a specific GitHub issue",
mimeType: "application/json"
},
{
uriTemplate: "db://tables/{table}/rows/{id}",
name: "Database Row",
description: "Fetch a row from a database table",
mimeType: "application/json"
}
]
};
});
server.setRequestHandler(ReadResourceSchema, async (request) => {
const uri = request.params.uri;
// Parse GitHub issue URI
const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);
if (githubMatch) {
const [_, owner, repo, issueNumber] = githubMatch;
const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));
return {
contents: [{
uri,
mimeType: "application/json",
text: JSON.stringify(issue, null, 2)
}]
};
}
throw new Error(`Unknown resource URI: ${uri}`);
});
---
Proper initialization and shutdown handling ensures clean resource management.
from mcp.server import Server
from contextlib import asynccontextmanager
app = Server("lifecycle-server")
# Shared state
db_connection = None
cache = None
@asynccontextmanager
async def lifespan(server: Server):
"""Manage server lifecycle."""
global db_connection, cache
# Startup
print("๐ Server starting...")
db_connection = await create_database_connection()
cache = await create_cache_client()
print("โ
Resources initialized")
yield # Server runs here
# Shutdown
print("๐ Server shutting down...")
await db_connection.close()
await cache.close()
print("โ
Resources cleaned up")
app = Server("lifecycle-server", lifespan=lifespan)
@app.tool()
async def query_database(sql: str) -> str:
"""Use the shared database connection."""
result = await db_connection.execute(sql)
return str(result)
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
class ManagedServer {
private server: Server;
private dbConnection: DatabaseConnection | null = null;
constructor() {
this.server = new Server({
name: "lifecycle-server",
version: "1.0.0"
});
this.setupHandlers();
}
async start() {
// Initialize resources
console.log("๐ Server starting...");
this.dbConnection = await createDatabaseConnection();
console.log("โ
Database connected");
// Start server
await this.server.connect(transport);
}
async stop() {
// Cleanup resources
console.log("๐ Server shutting down...");
if (this.dbConnection) {
await this.dbConnection.close();
}
await this.server.close();
console.log("โ
Cleanup complete");
}
private setupHandlers() {
this.server.setRequestHandler(CallToolSchema, async (request) => {
// Use this.dbConnection safely
// ...
});
}
}
// Usage with graceful shutdown
const server = new ManagedServer();
process.on('SIGINT', async () => {
await server.stop();
process.exit(0);
});
await server.start();
---
MCP supports server-side logging levels that clients can control.
from mcp.server import Server
from mcp.types import LoggingLevel
import logging
app = Server("logging-server")
# Map MCP levels to Python logging levels
LEVEL_MAP = {
LoggingLevel.DEBUG: logging.DEBUG,
LoggingLevel.INFO: logging.INFO,
LoggingLevel.WARNING: logging.WARNING,
LoggingLevel.ERROR: logging.ERROR,
}
logger = logging.getLogger("mcp-server")
@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
"""Handle client request to change logging level."""
python_level = LEVEL_MAP.get(level, logging.INFO)
logger.setLevel(python_level)
logger.info(f"Logging level set to {level}")
@app.tool()
async def debug_operation(data: str) -> str:
"""Tool with various logging levels."""
logger.debug(f"Processing data: {data}")
try:
result = process(data)
logger.info(f"Successfully processed: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
@app.tool()
async def complex_operation(input: str, ctx) -> str:
"""Operation that logs to client."""
# Send log notification to client
await ctx.send_log(
level="info",
message=f"Starting complex operation with input: {input}"
)
# Do work...
result = await do_work(input)
await ctx.send_log(
level="debug",
message=f"Operation complete, result size: {len(result)}"
)
return result
---
Consistent error handling improves debugging and user experience.
from mcp.types import McpError, ErrorCode
class ToolError(McpError):
"""Base class for tool errors."""
pass
class ValidationError(ToolError):
"""Invalid input parameters."""
def __init__(self, message: str):
super().__init__(ErrorCode.INVALID_PARAMS, message)
class NotFoundError(ToolError):
"""Requested resource not found."""
def __init__(self, resource: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
class PermissionError(ToolError):
"""Access denied."""
def __init__(self, action: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
class InternalError(ToolError):
"""Internal server error."""
def __init__(self, message: str):
super().__init__(ErrorCode.INTERNAL_ERROR, message)
@app.tool()
async def safe_operation(input: str) -> str:
"""Tool with comprehensive error handling."""
# Validate input
if not input:
raise ValidationError("Input cannot be empty")
if len(input) > 10000:
raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
try:
# Check permissions
if not await check_permission(input):
raise PermissionError(f"read {input}")
# Perform operation
result = await perform_operation(input)
if result is None:
raise NotFoundError(input)
return result
except ConnectionError as e:
raise InternalError(f"Database connection failed: {e}")
except TimeoutError as e:
raise InternalError(f"Operation timed out: {e}")
except Exception as e:
# Log unexpected errors
logger.exception(f"Unexpected error in safe_operation")
raise InternalError(f"Unexpected error: {type(e).__name__}")
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
function validateInput(data: unknown): asserts data is ValidInput {
if (typeof data !== "object" || data === null) {
throw new McpError(
ErrorCode.InvalidParams,
"Input must be an object"
);
}
// More validation...
}
server.setRequestHandler(CallToolSchema, async (request) => {
try {
validateInput(request.params.arguments);
const result = await performOperation(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }]
};
} catch (error) {
if (error instanceof McpError) {
throw error; // Already an MCP error
}
// Convert other errors
if (error instanceof NotFoundError) {
throw new McpError(ErrorCode.InvalidRequest, error.message);
}
// Unknown error
console.error("Unexpected error:", error);
throw new McpError(
ErrorCode.InternalError,
"An unexpected error occurred"
);
}
});
---
These features are marked as experimental in the specification:
# Tasks allow tracking long-running operations with state
@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
"""Long-running ML training task."""
# Report task started
await ctx.report_status("running", "Initializing training...")
# Training loop
for epoch in range(100):
await train_epoch(model_id, data_path, epoch)
await ctx.report_status(
"running",
f"Training epoch {epoch + 1}/100",
progress=epoch + 1,
total=100
)
await ctx.report_status("completed", "Training finished")
return f"Model {model_id} trained successfully"
# Annotations provide metadata about tool behavior
@app.tool(
annotations={
"destructive": False, # Does not modify data
"idempotent": True, # Safe to retry
"timeout_seconds": 30, # Expected max duration
"requires_approval": False # No user approval needed
}
)
async def safe_query(query: str) -> str:
"""A read-only database query tool."""
return await execute_read_query(query)
---
---
์ด ๊ฐ์ด๋๋ ๊ธฐ๋ณธ ๋๊ตฌ ๋ฐ ๋ฆฌ์์ค ์ฒ๋ฆฌ ์ด์์ ๊ณ ๊ธ MCP ํ๋กํ ์ฝ ๊ธฐ๋ฅ์ ํ๊ตฌํฉ๋๋ค. ์ด๋ฌํ ๊ธฐ๋ฅ์ ์ดํดํ๋ฉด ๋ณด๋ค ๊ฒฌ๊ณ ํ๊ณ ์ฌ์ฉ์ ์นํ์ ์ด๋ฉฐ ์์ฐ ์ค๋น๊ฐ ๋ MCP ์๋ฒ๋ฅผ ๊ตฌ์ถํ๋ ๋ฐ ๋์์ด ๋ฉ๋๋ค.
1. ์งํ ์๋ฆผ - ์ฅ์๊ฐ ์คํ๋๋ ์์ ์ ์งํ ์ํฉ ๋ณด๊ณ
2. ์์ฒญ ์ทจ์ - ํด๋ผ์ด์ธํธ๊ฐ ์งํ ์ค์ธ ์์ฒญ์ ์ทจ์ํ ์ ์๋๋ก ํ์ฉ
3. ๋ฆฌ์์ค ํ ํ๋ฆฟ - ๋งค๊ฐ๋ณ์๊ฐ ์๋ ๋์ ๋ฆฌ์์ค URI
4. ์๋ฒ ๋ผ์ดํ์ฌ์ดํด ์ด๋ฒคํธ - ์ ์ ํ ์ด๊ธฐํ ๋ฐ ์ข ๋ฃ
5. ๋ก๊น ์ ์ด - ์๋ฒ ์ธก ๋ก๊น ๊ตฌ์ฑ
6. ์ค๋ฅ ์ฒ๋ฆฌ ํจํด - ์ผ๊ด๋ ์ค๋ฅ ์๋ต
---
์๊ฐ์ด ๊ฑธ๋ฆฌ๋ ์์ (๋ฐ์ดํฐ ์ฒ๋ฆฌ, ํ์ผ ๋ค์ด๋ก๋, API ํธ์ถ ๋ฑ)์ ๊ฒฝ์ฐ, ์งํ ์๋ฆผ์ ์ฌ์ฉ์๊ฐ ์ํฉ์ ์ ์ ์๋๋ก ๋์์ค๋๋ค.
sequenceDiagram
participant Client
participant Server
Client->>Server: tools/call (๊ธด ์์
)
Server-->>Client: ์๋ฆผ: ์งํ๋ฅ 10%
Server-->>Client: ์๋ฆผ: ์งํ๋ฅ 50%
Server-->>Client: ์๋ฆผ: ์งํ๋ฅ 90%
Server->>Client: ๊ฒฐ๊ณผ (์๋ฃ)
from mcp.server import Server, NotificationOptions
from mcp.types import ProgressNotification
import asyncio
app = Server("progress-server")
@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
"""Process a large file with progress updates."""
# ์งํ ์ํฉ ๊ณ์ฐ์ ์ํ ํ์ผ ํฌ๊ธฐ ๊ฐ์ ธ์ค๊ธฐ
file_size = os.path.getsize(file_path)
processed = 0
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
# ์ฒญํฌ ์ฒ๋ฆฌ
await process_chunk(chunk)
processed += len(chunk)
# ์งํ ์ํฉ ์๋ฆผ ๋ณด๋ด๊ธฐ
progress = (processed / file_size) * 100
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=progress,
total=100,
message=f"Processing: {progress:.1f}%"
)
)
return f"Processed {file_size} bytes"
@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
"""Process multiple items with progress."""
results = []
total = len(items)
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# ๊ฐ ํญ๋ชฉ ํ ์งํ ์ํฉ ๋ณด๊ณ ํ๊ธฐ
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=i + 1,
total=total,
message=f"Processed {i + 1}/{total}: {item}"
)
)
return f"Completed {total} items"
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
server.setRequestHandler(CallToolSchema, async (request, extra) => {
const { name, arguments: args } = request.params;
if (name === "process_data") {
const items = args.items as string[];
const results = [];
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i]);
results.push(result);
// ์งํ ์๋ฆผ ๋ณด๋ด๊ธฐ
await extra.sendNotification({
method: "notifications/progress",
params: {
progressToken: request.id,
progress: i + 1,
total: items.length,
message: `Processing item ${i + 1}/${items.length}`
}
});
}
return { content: [{ type: "text", text: JSON.stringify(results) }] };
}
});
async def handle_progress(notification):
"""Handle progress notifications from server."""
params = notification.params
print(f"Progress: {params.progress}/{params.total} - {params.message}")
# ํธ๋ค๋ฌ ๋ฑ๋ก
session.on_notification("notifications/progress", handle_progress)
# ๋๊ตฌ ํธ์ถ (์งํ ์ํฉ ์
๋ฐ์ดํธ๋ ํธ๋ค๋ฌ๋ฅผ ํตํด ๋์ฐฉํฉ๋๋ค)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})
---
๋ ์ด์ ํ์ํ์ง ์๊ฑฐ๋ ๋๋ฌด ์ค๋ ๊ฑธ๋ฆฌ๋ ์์ฒญ์ ํด๋ผ์ด์ธํธ๊ฐ ์ทจ์ํ ์ ์๋๋ก ํ์ฉํฉ๋๋ค.
from mcp.server import Server
from mcp.types import CancelledError
import asyncio
app = Server("cancellable-server")
@app.tool()
async def long_running_search(query: str, ctx) -> str:
"""Search that can be cancelled."""
results = []
try:
for page in range(100): # ์ฌ๋ฌ ํ์ด์ง๋ฅผ ๊ฒ์ํฉ๋๋ค
# ์ทจ์ ์์ฒญ์ด ์์๋์ง ํ์ธํฉ๋๋ค
if ctx.is_cancelled:
raise CancelledError("Search cancelled by user")
# ํ์ด์ง ๊ฒ์์ ์๋ฎฌ๋ ์ด์
ํฉ๋๋ค
page_results = await search_page(query, page)
results.extend(page_results)
# ์งง์ ์ง์ฐ์ผ๋ก ์ทจ์ ํ์ธ์ด ๊ฐ๋ฅํฉ๋๋ค
await asyncio.sleep(0.1)
except CancelledError:
# ๋ถ๋ถ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํฉ๋๋ค
return f"Cancelled. Found {len(results)} results before cancellation."
return f"Found {len(results)} total results"
@app.tool()
async def download_file(url: str, ctx) -> str:
"""Download with cancellation support."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunks = []
async for chunk in response.content.iter_chunked(8192):
if ctx.is_cancelled:
return f"Download cancelled at {downloaded}/{total_size} bytes"
chunks.append(chunk)
downloaded += len(chunk)
return f"Downloaded {downloaded} bytes"
class CancellableContext:
"""Context object that tracks cancellation state."""
def __init__(self, request_id: str):
self.request_id = request_id
self._cancelled = asyncio.Event()
self._cancel_reason = None
@property
def is_cancelled(self) -> bool:
return self._cancelled.is_set()
def cancel(self, reason: str = "Cancelled"):
self._cancel_reason = reason
self._cancelled.set()
async def check_cancelled(self):
"""Raise if cancelled, otherwise continue."""
if self.is_cancelled:
raise CancelledError(self._cancel_reason)
async def sleep_or_cancel(self, seconds: float):
"""Sleep that can be interrupted by cancellation."""
try:
await asyncio.wait_for(
self._cancelled.wait(),
timeout=seconds
)
raise CancelledError(self._cancel_reason)
except asyncio.TimeoutError:
pass # ์ ์ ์๊ฐ ์ด๊ณผ, ๊ณ์ ์งํ
import asyncio
async def search_with_timeout(session, query, timeout=30):
"""Search with automatic cancellation on timeout."""
task = asyncio.create_task(
session.call_tool("long_running_search", {"query": query})
)
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
# ์์ฒญ ์ทจ์
await session.send_notification({
"method": "notifications/cancelled",
"params": {"requestId": task.request_id, "reason": "Timeout"}
})
return "Search timed out"
---
๋ฆฌ์์ค ํ ํ๋ฆฟ์ ๋งค๊ฐ๋ณ์๋ฅผ ์ฌ์ฉํ ๋์ URI ๊ตฌ์ฑ์ด ๊ฐ๋ฅํ๋ฉฐ, API ๋ฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฉํฉ๋๋ค.
from mcp.server import Server
from mcp.types import ResourceTemplate
app = Server("template-server")
@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
"""Return available resource templates."""
return [
ResourceTemplate(
uriTemplate="db://users/{user_id}",
name="User Profile",
description="Fetch user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="api://weather/{city}/{date}",
name="Weather Data",
description="Historical weather for city and date",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="file://{path}",
name="File Content",
description="Read file at given path",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource, expanding template parameters."""
# URI๋ฅผ ๊ตฌ๋ฌธ ๋ถ์ํ์ฌ ๋งค๊ฐ๋ณ์๋ฅผ ์ถ์ถํฉ๋๋ค
if uri.startswith("db://users/"):
user_id = uri.split("/")[-1]
return await fetch_user(user_id)
elif uri.startswith("api://weather/"):
parts = uri.replace("api://weather/", "").split("/")
city, date = parts[0], parts[1]
return await fetch_weather(city, date)
elif uri.startswith("file://"):
path = uri.replace("file://", "")
return await read_file(path)
raise ValueError(f"Unknown resource URI: {uri}")
server.setRequestHandler(ListResourceTemplatesSchema, async () => {
return {
resourceTemplates: [
{
uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",
name: "GitHub Issue",
description: "Fetch a specific GitHub issue",
mimeType: "application/json"
},
{
uriTemplate: "db://tables/{table}/rows/{id}",
name: "Database Row",
description: "Fetch a row from a database table",
mimeType: "application/json"
}
]
};
});
server.setRequestHandler(ReadResourceSchema, async (request) => {
const uri = request.params.uri;
// GitHub ์ด์ URI ํ์ฑํ๊ธฐ
const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);
if (githubMatch) {
const [_, owner, repo, issueNumber] = githubMatch;
const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));
return {
contents: [{
uri,
mimeType: "application/json",
text: JSON.stringify(issue, null, 2)
}]
};
}
throw new Error(`Unknown resource URI: ${uri}`);
});
---
์ ์ ํ ์ด๊ธฐํ ๋ฐ ์ข ๋ฃ ์ฒ๋ฆฌ๋ ๋ฆฌ์์ค ๊ด๋ฆฌ๋ฅผ ๊นจ๋ํ๊ฒ ์ ์งํฉ๋๋ค.
from mcp.server import Server
from contextlib import asynccontextmanager
app = Server("lifecycle-server")
# ๊ณต์ ์ํ
db_connection = None
cache = None
@asynccontextmanager
async def lifespan(server: Server):
"""Manage server lifecycle."""
global db_connection, cache
# ์์
print("๐ Server starting...")
db_connection = await create_database_connection()
cache = await create_cache_client()
print("โ
Resources initialized")
yield # ์๋ฒ๊ฐ ์ฌ๊ธฐ์ ์คํ๋ฉ๋๋ค
# ์ข
๋ฃ
print("๐ Server shutting down...")
await db_connection.close()
await cache.close()
print("โ
Resources cleaned up")
app = Server("lifecycle-server", lifespan=lifespan)
@app.tool()
async def query_database(sql: str) -> str:
"""Use the shared database connection."""
result = await db_connection.execute(sql)
return str(result)
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
class ManagedServer {
private server: Server;
private dbConnection: DatabaseConnection | null = null;
constructor() {
this.server = new Server({
name: "lifecycle-server",
version: "1.0.0"
});
this.setupHandlers();
}
async start() {
// ๋ฆฌ์์ค ์ด๊ธฐํ
console.log("๐ Server starting...");
this.dbConnection = await createDatabaseConnection();
console.log("โ
Database connected");
// ์๋ฒ ์์
await this.server.connect(transport);
}
async stop() {
// ๋ฆฌ์์ค ์ ๋ฆฌ
console.log("๐ Server shutting down...");
if (this.dbConnection) {
await this.dbConnection.close();
}
await this.server.close();
console.log("โ
Cleanup complete");
}
private setupHandlers() {
this.server.setRequestHandler(CallToolSchema, async (request) => {
// this.dbConnection์ ์์ ํ๊ฒ ์ฌ์ฉ
// ...
});
}
}
// ์ ์ ์ข
๋ฃ์ ํจ๊ป ์ฌ์ฉํ๊ธฐ
const server = new ManagedServer();
process.on('SIGINT', async () => {
await server.stop();
process.exit(0);
});
await server.start();
---
MCP๋ ํด๋ผ์ด์ธํธ๊ฐ ์ ์ดํ ์ ์๋ ์๋ฒ ์ธก ๋ก๊น ๋ ๋ฒจ์ ์ง์ํฉ๋๋ค.
from mcp.server import Server
from mcp.types import LoggingLevel
import logging
app = Server("logging-server")
# MCP ๋ ๋ฒจ์ Python ๋ก๊น
๋ ๋ฒจ์ ๋งคํํ๊ธฐ
LEVEL_MAP = {
LoggingLevel.DEBUG: logging.DEBUG,
LoggingLevel.INFO: logging.INFO,
LoggingLevel.WARNING: logging.WARNING,
LoggingLevel.ERROR: logging.ERROR,
}
logger = logging.getLogger("mcp-server")
@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
"""Handle client request to change logging level."""
python_level = LEVEL_MAP.get(level, logging.INFO)
logger.setLevel(python_level)
logger.info(f"Logging level set to {level}")
@app.tool()
async def debug_operation(data: str) -> str:
"""Tool with various logging levels."""
logger.debug(f"Processing data: {data}")
try:
result = process(data)
logger.info(f"Successfully processed: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
@app.tool()
async def complex_operation(input: str, ctx) -> str:
"""Operation that logs to client."""
# ํด๋ผ์ด์ธํธ์๊ฒ ๋ก๊ทธ ์๋ฆผ ์ ์ก
await ctx.send_log(
level="info",
message=f"Starting complex operation with input: {input}"
)
# ์์
์ํ ์ค...
result = await do_work(input)
await ctx.send_log(
level="debug",
message=f"Operation complete, result size: {len(result)}"
)
return result
---
์ผ๊ด๋ ์ค๋ฅ ์ฒ๋ฆฌ๋ ๋๋ฒ๊น ๊ณผ ์ฌ์ฉ์ ๊ฒฝํ์ ๊ฐ์ ํฉ๋๋ค.
from mcp.types import McpError, ErrorCode
class ToolError(McpError):
"""Base class for tool errors."""
pass
class ValidationError(ToolError):
"""Invalid input parameters."""
def __init__(self, message: str):
super().__init__(ErrorCode.INVALID_PARAMS, message)
class NotFoundError(ToolError):
"""Requested resource not found."""
def __init__(self, resource: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
class PermissionError(ToolError):
"""Access denied."""
def __init__(self, action: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
class InternalError(ToolError):
"""Internal server error."""
def __init__(self, message: str):
super().__init__(ErrorCode.INTERNAL_ERROR, message)
@app.tool()
async def safe_operation(input: str) -> str:
"""Tool with comprehensive error handling."""
# ์
๋ ฅ ๊ฐ ์ ํจ์ฑ ๊ฒ์ฌ
if not input:
raise ValidationError("Input cannot be empty")
if len(input) > 10000:
raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
try:
# ๊ถํ ํ์ธ
if not await check_permission(input):
raise PermissionError(f"read {input}")
# ์์
์ํ
result = await perform_operation(input)
if result is None:
raise NotFoundError(input)
return result
except ConnectionError as e:
raise InternalError(f"Database connection failed: {e}")
except TimeoutError as e:
raise InternalError(f"Operation timed out: {e}")
except Exception as e:
# ์์์น ๋ชปํ ์ค๋ฅ ๊ธฐ๋ก
logger.exception(f"Unexpected error in safe_operation")
raise InternalError(f"Unexpected error: {type(e).__name__}")
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
function validateInput(data: unknown): asserts data is ValidInput {
if (typeof data !== "object" || data === null) {
throw new McpError(
ErrorCode.InvalidParams,
"Input must be an object"
);
}
// ๋ ๋ง์ ๊ฒ์ฆ...
}
server.setRequestHandler(CallToolSchema, async (request) => {
try {
validateInput(request.params.arguments);
const result = await performOperation(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }]
};
} catch (error) {
if (error instanceof McpError) {
throw error; // ์ด๋ฏธ MCP ์ค๋ฅ์
๋๋ค
}
// ๋ค๋ฅธ ์ค๋ฅ ๋ณํ
if (error instanceof NotFoundError) {
throw new McpError(ErrorCode.InvalidRequest, error.message);
}
// ์ ์ ์๋ ์ค๋ฅ
console.error("Unexpected error:", error);
throw new McpError(
ErrorCode.InternalError,
"An unexpected error occurred"
);
}
});
---
์ด๋ฌํ ๊ธฐ๋ฅ์ ๋ช ์ธ์์์ ์คํ์ ๊ธฐ๋ฅ์ผ๋ก ํ์๋ฉ๋๋ค:
# ์์
์ ์ํ๊ฐ ์๋ ์ฅ๊ธฐ ์คํ ์์
์ ์ถ์ ํ ์ ์๊ฒ ํด์ค๋๋ค
@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
"""Long-running ML training task."""
# ์์
์์ ๋ณด๊ณ
await ctx.report_status("running", "Initializing training...")
# ํ๋ จ ๋ฃจํ
for epoch in range(100):
await train_epoch(model_id, data_path, epoch)
await ctx.report_status(
"running",
f"Training epoch {epoch + 1}/100",
progress=epoch + 1,
total=100
)
await ctx.report_status("completed", "Training finished")
return f"Model {model_id} trained successfully"
# ์ฃผ์์ ๋๊ตฌ ๋์์ ๋ํ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ ๊ณตํฉ๋๋ค
@app.tool(
annotations={
"destructive": False, # ๋ฐ์ดํฐ๋ฅผ ์์ ํ์ง ์์ต๋๋ค
"idempotent": True, # ์ฌ์๋ํด๋ ์์ ํฉ๋๋ค
"timeout_seconds": 30, # ์์ ์ต๋ ์์ ์๊ฐ
"requires_approval": False # ์ฌ์ฉ์ ์น์ธ ๋ถํ์
}
)
async def safe_query(query: str) -> str:
"""A read-only database query tool."""
return await execute_read_query(query)
---
---
---
๋ฉด์ฑ ์กฐํญ:
์ด ๋ฌธ์๋ AI ๋ฒ์ญ ์๋น์ค Co-op Translator๋ฅผ ์ฌ์ฉํ์ฌ ๋ฒ์ญ๋์์ต๋๋ค.
์ ํ์ฑ์ ์ํด ๋ ธ๋ ฅํ๊ณ ์์ผ๋, ์๋ ๋ฒ์ญ์๋ ์ค๋ฅ๋ ๋ถ์ ํ์ฑ์ด ์์ ์ ์์์ ์์งํด ์ฃผ์๊ธฐ ๋ฐ๋๋๋ค.
์๋ฌธ์ ํด๋น ์ธ์ด์ ์๋ณธ ๋ฌธ์๊ฐ ๊ถ์ ์๋ ์ถ์ฒ๋ก ๊ฐ์ฃผ๋์ด์ผ ํฉ๋๋ค.
์ค์ํ ์ ๋ณด์ ๊ฒฝ์ฐ ์ ๋ฌธ ์ธ๊ฐ ๋ฒ์ญ์ ๊ถ์ฅํฉ๋๋ค.
๋ณธ ๋ฒ์ญ ์ฌ์ฉ์ผ๋ก ์ธํ ์คํด๋ ์๋ชป๋ ํด์์ ๋ํด ๋น์ฌ๋ ์ฑ ์์ง์ง ์์ต๋๋ค.