5.16 Protocol Features Deep Dive

Module
Advanced Topics
Progress
93%

MCP Protocol Features Deep Dive

This guide explores advanced MCP protocol features that go beyond basic tool and resource handling. Understanding these features helps you build more robust, user-friendly, and production-ready MCP servers.

Features Covered

1. Progress Notifications - Report progress for long-running operations

2. Request Cancellation - Allow clients to cancel in-flight requests

3. Resource Templates - Dynamic resource URIs with parameters

4. Server Lifecycle Events - Proper initialization and shutdown

5. Logging Control - Server-side logging configuration

6. Error Handling Patterns - Consistent error responses

---

1. Progress Notifications

For operations that take time (data processing, file downloads, API calls), progress notifications keep users informed.

How It Works


sequenceDiagram

    participant Client

    participant Server

    

    Client->>Server: tools/call (long operation)

    Server-->>Client: notification: progress 10%

    Server-->>Client: notification: progress 50%

    Server-->>Client: notification: progress 90%

    Server->>Client: result (complete)

Python Implementation


from mcp.server import Server, NotificationOptions

from mcp.types import ProgressNotification

import asyncio



app = Server("progress-server")



@app.tool()

async def process_large_file(file_path: str, ctx) -> str:

    """Process a large file with progress updates."""

    

    # Get file size for progress calculation

    file_size = os.path.getsize(file_path)

    processed = 0

    

    with open(file_path, 'rb') as f:

        while chunk := f.read(8192):

            # Process chunk

            await process_chunk(chunk)

            processed += len(chunk)

            

            # Send progress notification

            progress = (processed / file_size) * 100

            await ctx.send_notification(

                ProgressNotification(

                    progressToken=ctx.request_id,

                    progress=progress,

                    total=100,

                    message=f"Processing: {progress:.1f}%"

                )

            )

    

    return f"Processed {file_size} bytes"



@app.tool()

async def batch_operation(items: list[str], ctx) -> str:

    """Process multiple items with progress."""

    

    results = []

    total = len(items)

    

    for i, item in enumerate(items):

        result = await process_item(item)

        results.append(result)

        

        # Report progress after each item

        await ctx.send_notification(

            ProgressNotification(

                progressToken=ctx.request_id,

                progress=i + 1,

                total=total,

                message=f"Processed {i + 1}/{total}: {item}"

            )

        )

    

    return f"Completed {total} items"

TypeScript Implementation


import { Server } from "@modelcontextprotocol/sdk/server/index.js";



server.setRequestHandler(CallToolSchema, async (request, extra) => {

  const { name, arguments: args } = request.params;

  

  if (name === "process_data") {

    const items = args.items as string[];

    const results = [];

    

    for (let i = 0; i < items.length; i++) {

      const result = await processItem(items[i]);

      results.push(result);

      

      // Send progress notification

      await extra.sendNotification({

        method: "notifications/progress",

        params: {

          progressToken: request.id,

          progress: i + 1,

          total: items.length,

          message: `Processing item ${i + 1}/${items.length}`

        }

      });

    }

    

    return { content: [{ type: "text", text: JSON.stringify(results) }] };

  }

});

Client Handling (Python)


async def handle_progress(notification):

    """Handle progress notifications from server."""

    params = notification.params

    print(f"Progress: {params.progress}/{params.total} - {params.message}")



# Register handler

session.on_notification("notifications/progress", handle_progress)



# Call tool (progress updates will arrive via handler)

result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})

---

2. Request Cancellation

Allow clients to cancel requests that are no longer needed or taking too long.

Python Implementation


from mcp.server import Server

from mcp.types import CancelledError

import asyncio



app = Server("cancellable-server")



@app.tool()

async def long_running_search(query: str, ctx) -> str:

    """Search that can be cancelled."""

    

    results = []

    

    try:

        for page in range(100):  # Search through many pages

            # Check if cancellation was requested

            if ctx.is_cancelled:

                raise CancelledError("Search cancelled by user")

            

            # Simulate page search

            page_results = await search_page(query, page)

            results.extend(page_results)

            

            # Small delay allows cancellation checks

            await asyncio.sleep(0.1)

            

    except CancelledError:

        # Return partial results

        return f"Cancelled. Found {len(results)} results before cancellation."

    

    return f"Found {len(results)} total results"



@app.tool()

async def download_file(url: str, ctx) -> str:

    """Download with cancellation support."""

    

    async with aiohttp.ClientSession() as session:

        async with session.get(url) as response:

            total_size = int(response.headers.get('content-length', 0))

            downloaded = 0

            chunks = []

            

            async for chunk in response.content.iter_chunked(8192):

                if ctx.is_cancelled:

                    return f"Download cancelled at {downloaded}/{total_size} bytes"

                

                chunks.append(chunk)

                downloaded += len(chunk)

            

            return f"Downloaded {downloaded} bytes"

Implementing Cancellation Context


class CancellableContext:

    """Context object that tracks cancellation state."""

    

    def __init__(self, request_id: str):

        self.request_id = request_id

        self._cancelled = asyncio.Event()

        self._cancel_reason = None

    

    @property

    def is_cancelled(self) -> bool:

        return self._cancelled.is_set()

    

    def cancel(self, reason: str = "Cancelled"):

        self._cancel_reason = reason

        self._cancelled.set()

    

    async def check_cancelled(self):

        """Raise if cancelled, otherwise continue."""

        if self.is_cancelled:

            raise CancelledError(self._cancel_reason)

    

    async def sleep_or_cancel(self, seconds: float):

        """Sleep that can be interrupted by cancellation."""

        try:

            await asyncio.wait_for(

                self._cancelled.wait(),

                timeout=seconds

            )

            raise CancelledError(self._cancel_reason)

        except asyncio.TimeoutError:

            pass  # Normal timeout, continue

Client-Side Cancellation


import asyncio



async def search_with_timeout(session, query, timeout=30):

    """Search with automatic cancellation on timeout."""

    

    task = asyncio.create_task(

        session.call_tool("long_running_search", {"query": query})

    )

    

    try:

        result = await asyncio.wait_for(task, timeout=timeout)

        return result

    except asyncio.TimeoutError:

        # Request cancellation

        await session.send_notification({

            "method": "notifications/cancelled",

            "params": {"requestId": task.request_id, "reason": "Timeout"}

        })

        return "Search timed out"

---

3. Resource Templates

Resource templates allow dynamic URI construction with parameters, useful for APIs and databases.

Defining Templates


from mcp.server import Server

from mcp.types import ResourceTemplate



app = Server("template-server")



@app.list_resource_templates()

async def list_templates() -> list[ResourceTemplate]:

    """Return available resource templates."""

    return [

        ResourceTemplate(

            uriTemplate="db://users/{user_id}",

            name="User Profile",

            description="Fetch user profile by ID",

            mimeType="application/json"

        ),

        ResourceTemplate(

            uriTemplate="api://weather/{city}/{date}",

            name="Weather Data",

            description="Historical weather for city and date",

            mimeType="application/json"

        ),

        ResourceTemplate(

            uriTemplate="file://{path}",

            name="File Content",

            description="Read file at given path",

            mimeType="text/plain"

        )

    ]



@app.read_resource()

async def read_resource(uri: str) -> str:

    """Read resource, expanding template parameters."""

    

    # Parse the URI to extract parameters

    if uri.startswith("db://users/"):

        user_id = uri.split("/")[-1]

        return await fetch_user(user_id)

    

    elif uri.startswith("api://weather/"):

        parts = uri.replace("api://weather/", "").split("/")

        city, date = parts[0], parts[1]

        return await fetch_weather(city, date)

    

    elif uri.startswith("file://"):

        path = uri.replace("file://", "")

        return await read_file(path)

    

    raise ValueError(f"Unknown resource URI: {uri}")

TypeScript Implementation


server.setRequestHandler(ListResourceTemplatesSchema, async () => {

  return {

    resourceTemplates: [

      {

        uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",

        name: "GitHub Issue",

        description: "Fetch a specific GitHub issue",

        mimeType: "application/json"

      },

      {

        uriTemplate: "db://tables/{table}/rows/{id}",

        name: "Database Row",

        description: "Fetch a row from a database table",

        mimeType: "application/json"

      }

    ]

  };

});



server.setRequestHandler(ReadResourceSchema, async (request) => {

  const uri = request.params.uri;

  

  // Parse GitHub issue URI

  const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);

  if (githubMatch) {

    const [_, owner, repo, issueNumber] = githubMatch;

    const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));

    return {

      contents: [{

        uri,

        mimeType: "application/json",

        text: JSON.stringify(issue, null, 2)

      }]

    };

  }

  

  throw new Error(`Unknown resource URI: ${uri}`);

});

---

4. Server Lifecycle Events

Proper initialization and shutdown handling ensures clean resource management.

Python Lifecycle Management


from mcp.server import Server

from contextlib import asynccontextmanager



app = Server("lifecycle-server")



# Shared state

db_connection = None

cache = None



@asynccontextmanager

async def lifespan(server: Server):

    """Manage server lifecycle."""

    global db_connection, cache

    

    # Startup

    print("๐Ÿš€ Server starting...")

    db_connection = await create_database_connection()

    cache = await create_cache_client()

    print("โœ… Resources initialized")

    

    yield  # Server runs here

    

    # Shutdown

    print("๐Ÿ›‘ Server shutting down...")

    await db_connection.close()

    await cache.close()

    print("โœ… Resources cleaned up")



app = Server("lifecycle-server", lifespan=lifespan)



@app.tool()

async def query_database(sql: str) -> str:

    """Use the shared database connection."""

    result = await db_connection.execute(sql)

    return str(result)

TypeScript Lifecycle


import { Server } from "@modelcontextprotocol/sdk/server/index.js";



class ManagedServer {

  private server: Server;

  private dbConnection: DatabaseConnection | null = null;

  

  constructor() {

    this.server = new Server({

      name: "lifecycle-server",

      version: "1.0.0"

    });

    

    this.setupHandlers();

  }

  

  async start() {

    // Initialize resources

    console.log("๐Ÿš€ Server starting...");

    this.dbConnection = await createDatabaseConnection();

    console.log("โœ… Database connected");

    

    // Start server

    await this.server.connect(transport);

  }

  

  async stop() {

    // Cleanup resources

    console.log("๐Ÿ›‘ Server shutting down...");

    if (this.dbConnection) {

      await this.dbConnection.close();

    }

    await this.server.close();

    console.log("โœ… Cleanup complete");

  }

  

  private setupHandlers() {

    this.server.setRequestHandler(CallToolSchema, async (request) => {

      // Use this.dbConnection safely

      // ...

    });

  }

}



// Usage with graceful shutdown

const server = new ManagedServer();



process.on('SIGINT', async () => {

  await server.stop();

  process.exit(0);

});



await server.start();

---

5. Logging Control

MCP supports server-side logging levels that clients can control.

Implementing Logging Levels


from mcp.server import Server

from mcp.types import LoggingLevel

import logging



app = Server("logging-server")



# Map MCP levels to Python logging levels

LEVEL_MAP = {

    LoggingLevel.DEBUG: logging.DEBUG,

    LoggingLevel.INFO: logging.INFO,

    LoggingLevel.WARNING: logging.WARNING,

    LoggingLevel.ERROR: logging.ERROR,

}



logger = logging.getLogger("mcp-server")



@app.set_logging_level()

async def set_logging_level(level: LoggingLevel) -> None:

    """Handle client request to change logging level."""

    python_level = LEVEL_MAP.get(level, logging.INFO)

    logger.setLevel(python_level)

    logger.info(f"Logging level set to {level}")



@app.tool()

async def debug_operation(data: str) -> str:

    """Tool with various logging levels."""

    logger.debug(f"Processing data: {data}")

    

    try:

        result = process(data)

        logger.info(f"Successfully processed: {result}")

        return result

    except Exception as e:

        logger.error(f"Processing failed: {e}")

        raise

Sending Log Messages to Client


@app.tool()

async def complex_operation(input: str, ctx) -> str:

    """Operation that logs to client."""

    

    # Send log notification to client

    await ctx.send_log(

        level="info",

        message=f"Starting complex operation with input: {input}"

    )

    

    # Do work...

    result = await do_work(input)

    

    await ctx.send_log(

        level="debug",

        message=f"Operation complete, result size: {len(result)}"

    )

    

    return result

---

6. Error Handling Patterns

Consistent error handling improves debugging and user experience.

MCP Error Codes


from mcp.types import McpError, ErrorCode



class ToolError(McpError):

    """Base class for tool errors."""

    pass



class ValidationError(ToolError):

    """Invalid input parameters."""

    def __init__(self, message: str):

        super().__init__(ErrorCode.INVALID_PARAMS, message)



class NotFoundError(ToolError):

    """Requested resource not found."""

    def __init__(self, resource: str):

        super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")



class PermissionError(ToolError):

    """Access denied."""

    def __init__(self, action: str):

        super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")



class InternalError(ToolError):

    """Internal server error."""

    def __init__(self, message: str):

        super().__init__(ErrorCode.INTERNAL_ERROR, message)

Structured Error Responses


@app.tool()

async def safe_operation(input: str) -> str:

    """Tool with comprehensive error handling."""

    

    # Validate input

    if not input:

        raise ValidationError("Input cannot be empty")

    

    if len(input) > 10000:

        raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")

    

    try:

        # Check permissions

        if not await check_permission(input):

            raise PermissionError(f"read {input}")

        

        # Perform operation

        result = await perform_operation(input)

        

        if result is None:

            raise NotFoundError(input)

        

        return result

        

    except ConnectionError as e:

        raise InternalError(f"Database connection failed: {e}")

    except TimeoutError as e:

        raise InternalError(f"Operation timed out: {e}")

    except Exception as e:

        # Log unexpected errors

        logger.exception(f"Unexpected error in safe_operation")

        raise InternalError(f"Unexpected error: {type(e).__name__}")

Error Handling in TypeScript


import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";



function validateInput(data: unknown): asserts data is ValidInput {

  if (typeof data !== "object" || data === null) {

    throw new McpError(

      ErrorCode.InvalidParams,

      "Input must be an object"

    );

  }

  // More validation...

}



server.setRequestHandler(CallToolSchema, async (request) => {

  try {

    validateInput(request.params.arguments);

    

    const result = await performOperation(request.params.arguments);

    

    return {

      content: [{ type: "text", text: JSON.stringify(result) }]

    };

    

  } catch (error) {

    if (error instanceof McpError) {

      throw error;  // Already an MCP error

    }

    

    // Convert other errors

    if (error instanceof NotFoundError) {

      throw new McpError(ErrorCode.InvalidRequest, error.message);

    }

    

    // Unknown error

    console.error("Unexpected error:", error);

    throw new McpError(

      ErrorCode.InternalError,

      "An unexpected error occurred"

    );

  }

});

---

Experimental Features (MCP 2025-11-25)

These features are marked as experimental in the specification:

Tasks (Long-Running Operations)


# Tasks allow tracking long-running operations with state

@app.task()

async def training_task(model_id: str, data_path: str, ctx) -> str:

    """Long-running ML training task."""

    

    # Report task started

    await ctx.report_status("running", "Initializing training...")

    

    # Training loop

    for epoch in range(100):

        await train_epoch(model_id, data_path, epoch)

        await ctx.report_status(

            "running",

            f"Training epoch {epoch + 1}/100",

            progress=epoch + 1,

            total=100

        )

    

    await ctx.report_status("completed", "Training finished")

    return f"Model {model_id} trained successfully"

Tool Annotations


# Annotations provide metadata about tool behavior

@app.tool(

    annotations={

        "destructive": False,      # Does not modify data

        "idempotent": True,        # Safe to retry

        "timeout_seconds": 30,     # Expected max duration

        "requires_approval": False # No user approval needed

    }

)

async def safe_query(query: str) -> str:

    """A read-only database query tool."""

    return await execute_read_query(query)

---

What's Next

  • Module 8 - Best Practices
  • 5.14 - Context Engineering
  • MCP Specification Changelog
  • ---

    Additional Resources

  • MCP Specification 2025-11-25
  • JSON-RPC 2.0 Error Codes
  • Python SDK Examples
  • TypeScript SDK Examples
  • MCP ํ”„๋กœํ† ์ฝœ ๊ธฐ๋Šฅ ์‹ฌ์ธต ๋ถ„์„

    ์ด ๊ฐ€์ด๋“œ๋Š” ๊ธฐ๋ณธ ๋„๊ตฌ ๋ฐ ๋ฆฌ์†Œ์Šค ์ฒ˜๋ฆฌ ์ด์ƒ์˜ ๊ณ ๊ธ‰ MCP ํ”„๋กœํ† ์ฝœ ๊ธฐ๋Šฅ์„ ํƒ๊ตฌํ•ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๊ธฐ๋Šฅ์„ ์ดํ•ดํ•˜๋ฉด ๋ณด๋‹ค ๊ฒฌ๊ณ ํ•˜๊ณ  ์‚ฌ์šฉ์ž ์นœํ™”์ ์ด๋ฉฐ ์ƒ์‚ฐ ์ค€๋น„๊ฐ€ ๋œ MCP ์„œ๋ฒ„๋ฅผ ๊ตฌ์ถ•ํ•˜๋Š” ๋ฐ ๋„์›€์ด ๋ฉ๋‹ˆ๋‹ค.

    ๋‹ค๋ฃจ๋Š” ๊ธฐ๋Šฅ

    1. ์ง„ํ–‰ ์•Œ๋ฆผ - ์žฅ์‹œ๊ฐ„ ์‹คํ–‰๋˜๋Š” ์ž‘์—…์˜ ์ง„ํ–‰ ์ƒํ™ฉ ๋ณด๊ณ 

    2. ์š”์ฒญ ์ทจ์†Œ - ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์ง„ํ–‰ ์ค‘์ธ ์š”์ฒญ์„ ์ทจ์†Œํ•  ์ˆ˜ ์žˆ๋„๋ก ํ—ˆ์šฉ

    3. ๋ฆฌ์†Œ์Šค ํ…œํ”Œ๋ฆฟ - ๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ์žˆ๋Š” ๋™์  ๋ฆฌ์†Œ์Šค URI

    4. ์„œ๋ฒ„ ๋ผ์ดํ”„์‚ฌ์ดํด ์ด๋ฒคํŠธ - ์ ์ ˆํ•œ ์ดˆ๊ธฐํ™” ๋ฐ ์ข…๋ฃŒ

    5. ๋กœ๊น… ์ œ์–ด - ์„œ๋ฒ„ ์ธก ๋กœ๊น… ๊ตฌ์„ฑ

    6. ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ ํŒจํ„ด - ์ผ๊ด€๋œ ์˜ค๋ฅ˜ ์‘๋‹ต

    ---

    1. ์ง„ํ–‰ ์•Œ๋ฆผ

    ์‹œ๊ฐ„์ด ๊ฑธ๋ฆฌ๋Š” ์ž‘์—…(๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ, ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ, API ํ˜ธ์ถœ ๋“ฑ)์˜ ๊ฒฝ์šฐ, ์ง„ํ–‰ ์•Œ๋ฆผ์€ ์‚ฌ์šฉ์ž๊ฐ€ ์ƒํ™ฉ์„ ์•Œ ์ˆ˜ ์žˆ๋„๋ก ๋„์™€์ค๋‹ˆ๋‹ค.

    ์ž‘๋™ ๋ฐฉ์‹

    
    sequenceDiagram
    
        participant Client
    
        participant Server
    
        
    
        Client->>Server: tools/call (๊ธด ์ž‘์—…)
    
        Server-->>Client: ์•Œ๋ฆผ: ์ง„ํ–‰๋ฅ  10%
    
        Server-->>Client: ์•Œ๋ฆผ: ์ง„ํ–‰๋ฅ  50%
    
        Server-->>Client: ์•Œ๋ฆผ: ์ง„ํ–‰๋ฅ  90%
    
        Server->>Client: ๊ฒฐ๊ณผ (์™„๋ฃŒ)
    
    

    Python ๊ตฌํ˜„

    
    from mcp.server import Server, NotificationOptions
    
    from mcp.types import ProgressNotification
    
    import asyncio
    
    
    
    app = Server("progress-server")
    
    
    
    @app.tool()
    
    async def process_large_file(file_path: str, ctx) -> str:
    
        """Process a large file with progress updates."""
    
        
    
        # ์ง„ํ–‰ ์ƒํ™ฉ ๊ณ„์‚ฐ์„ ์œ„ํ•œ ํŒŒ์ผ ํฌ๊ธฐ ๊ฐ€์ ธ์˜ค๊ธฐ
    
        file_size = os.path.getsize(file_path)
    
        processed = 0
    
        
    
        with open(file_path, 'rb') as f:
    
            while chunk := f.read(8192):
    
                # ์ฒญํฌ ์ฒ˜๋ฆฌ
    
                await process_chunk(chunk)
    
                processed += len(chunk)
    
                
    
                # ์ง„ํ–‰ ์ƒํ™ฉ ์•Œ๋ฆผ ๋ณด๋‚ด๊ธฐ
    
                progress = (processed / file_size) * 100
    
                await ctx.send_notification(
    
                    ProgressNotification(
    
                        progressToken=ctx.request_id,
    
                        progress=progress,
    
                        total=100,
    
                        message=f"Processing: {progress:.1f}%"
    
                    )
    
                )
    
        
    
        return f"Processed {file_size} bytes"
    
    
    
    @app.tool()
    
    async def batch_operation(items: list[str], ctx) -> str:
    
        """Process multiple items with progress."""
    
        
    
        results = []
    
        total = len(items)
    
        
    
        for i, item in enumerate(items):
    
            result = await process_item(item)
    
            results.append(result)
    
            
    
            # ๊ฐ ํ•ญ๋ชฉ ํ›„ ์ง„ํ–‰ ์ƒํ™ฉ ๋ณด๊ณ ํ•˜๊ธฐ
    
            await ctx.send_notification(
    
                ProgressNotification(
    
                    progressToken=ctx.request_id,
    
                    progress=i + 1,
    
                    total=total,
    
                    message=f"Processed {i + 1}/{total}: {item}"
    
                )
    
            )
    
        
    
        return f"Completed {total} items"
    
    

    TypeScript ๊ตฌํ˜„

    
    import { Server } from "@modelcontextprotocol/sdk/server/index.js";
    
    
    
    server.setRequestHandler(CallToolSchema, async (request, extra) => {
    
      const { name, arguments: args } = request.params;
    
      
    
      if (name === "process_data") {
    
        const items = args.items as string[];
    
        const results = [];
    
        
    
        for (let i = 0; i < items.length; i++) {
    
          const result = await processItem(items[i]);
    
          results.push(result);
    
          
    
          // ์ง„ํ–‰ ์•Œ๋ฆผ ๋ณด๋‚ด๊ธฐ
    
          await extra.sendNotification({
    
            method: "notifications/progress",
    
            params: {
    
              progressToken: request.id,
    
              progress: i + 1,
    
              total: items.length,
    
              message: `Processing item ${i + 1}/${items.length}`
    
            }
    
          });
    
        }
    
        
    
        return { content: [{ type: "text", text: JSON.stringify(results) }] };
    
      }
    
    });
    
    

    ํด๋ผ์ด์–ธํŠธ ์ฒ˜๋ฆฌ (Python)

    
    async def handle_progress(notification):
    
        """Handle progress notifications from server."""
    
        params = notification.params
    
        print(f"Progress: {params.progress}/{params.total} - {params.message}")
    
    
    
    # ํ•ธ๋“ค๋Ÿฌ ๋“ฑ๋ก
    
    session.on_notification("notifications/progress", handle_progress)
    
    
    
    # ๋„๊ตฌ ํ˜ธ์ถœ (์ง„ํ–‰ ์ƒํ™ฉ ์—…๋ฐ์ดํŠธ๋Š” ํ•ธ๋“ค๋Ÿฌ๋ฅผ ํ†ตํ•ด ๋„์ฐฉํ•ฉ๋‹ˆ๋‹ค)
    
    result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})
    
    

    ---

    2. ์š”์ฒญ ์ทจ์†Œ

    ๋” ์ด์ƒ ํ•„์š”ํ•˜์ง€ ์•Š๊ฑฐ๋‚˜ ๋„ˆ๋ฌด ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” ์š”์ฒญ์„ ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์ทจ์†Œํ•  ์ˆ˜ ์žˆ๋„๋ก ํ—ˆ์šฉํ•ฉ๋‹ˆ๋‹ค.

    Python ๊ตฌํ˜„

    
    from mcp.server import Server
    
    from mcp.types import CancelledError
    
    import asyncio
    
    
    
    app = Server("cancellable-server")
    
    
    
    @app.tool()
    
    async def long_running_search(query: str, ctx) -> str:
    
        """Search that can be cancelled."""
    
        
    
        results = []
    
        
    
        try:
    
            for page in range(100):  # ์—ฌ๋Ÿฌ ํŽ˜์ด์ง€๋ฅผ ๊ฒ€์ƒ‰ํ•ฉ๋‹ˆ๋‹ค
    
                # ์ทจ์†Œ ์š”์ฒญ์ด ์žˆ์—ˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค
    
                if ctx.is_cancelled:
    
                    raise CancelledError("Search cancelled by user")
    
                
    
                # ํŽ˜์ด์ง€ ๊ฒ€์ƒ‰์„ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ํ•ฉ๋‹ˆ๋‹ค
    
                page_results = await search_page(query, page)
    
                results.extend(page_results)
    
                
    
                # ์งง์€ ์ง€์—ฐ์œผ๋กœ ์ทจ์†Œ ํ™•์ธ์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค
    
                await asyncio.sleep(0.1)
    
                
    
        except CancelledError:
    
            # ๋ถ€๋ถ„ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค
    
            return f"Cancelled. Found {len(results)} results before cancellation."
    
        
    
        return f"Found {len(results)} total results"
    
    
    
    @app.tool()
    
    async def download_file(url: str, ctx) -> str:
    
        """Download with cancellation support."""
    
        
    
        async with aiohttp.ClientSession() as session:
    
            async with session.get(url) as response:
    
                total_size = int(response.headers.get('content-length', 0))
    
                downloaded = 0
    
                chunks = []
    
                
    
                async for chunk in response.content.iter_chunked(8192):
    
                    if ctx.is_cancelled:
    
                        return f"Download cancelled at {downloaded}/{total_size} bytes"
    
                    
    
                    chunks.append(chunk)
    
                    downloaded += len(chunk)
    
                
    
                return f"Downloaded {downloaded} bytes"
    
    

    ์ทจ์†Œ ์ปจํ…์ŠคํŠธ ๊ตฌํ˜„

    
    class CancellableContext:
    
        """Context object that tracks cancellation state."""
    
        
    
        def __init__(self, request_id: str):
    
            self.request_id = request_id
    
            self._cancelled = asyncio.Event()
    
            self._cancel_reason = None
    
        
    
        @property
    
        def is_cancelled(self) -> bool:
    
            return self._cancelled.is_set()
    
        
    
        def cancel(self, reason: str = "Cancelled"):
    
            self._cancel_reason = reason
    
            self._cancelled.set()
    
        
    
        async def check_cancelled(self):
    
            """Raise if cancelled, otherwise continue."""
    
            if self.is_cancelled:
    
                raise CancelledError(self._cancel_reason)
    
        
    
        async def sleep_or_cancel(self, seconds: float):
    
            """Sleep that can be interrupted by cancellation."""
    
            try:
    
                await asyncio.wait_for(
    
                    self._cancelled.wait(),
    
                    timeout=seconds
    
                )
    
                raise CancelledError(self._cancel_reason)
    
            except asyncio.TimeoutError:
    
                pass  # ์ •์ƒ ์‹œ๊ฐ„ ์ดˆ๊ณผ, ๊ณ„์† ์ง„ํ–‰
    
    

    ํด๋ผ์ด์–ธํŠธ ์ธก ์ทจ์†Œ

    
    import asyncio
    
    
    
    async def search_with_timeout(session, query, timeout=30):
    
        """Search with automatic cancellation on timeout."""
    
        
    
        task = asyncio.create_task(
    
            session.call_tool("long_running_search", {"query": query})
    
        )
    
        
    
        try:
    
            result = await asyncio.wait_for(task, timeout=timeout)
    
            return result
    
        except asyncio.TimeoutError:
    
            # ์š”์ฒญ ์ทจ์†Œ
    
            await session.send_notification({
    
                "method": "notifications/cancelled",
    
                "params": {"requestId": task.request_id, "reason": "Timeout"}
    
            })
    
            return "Search timed out"
    
    

    ---

    3. ๋ฆฌ์†Œ์Šค ํ…œํ”Œ๋ฆฟ

    ๋ฆฌ์†Œ์Šค ํ…œํ”Œ๋ฆฟ์€ ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ ๋™์  URI ๊ตฌ์„ฑ์ด ๊ฐ€๋Šฅํ•˜๋ฉฐ, API ๋ฐ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค.

    ํ…œํ”Œ๋ฆฟ ์ •์˜

    
    from mcp.server import Server
    
    from mcp.types import ResourceTemplate
    
    
    
    app = Server("template-server")
    
    
    
    @app.list_resource_templates()
    
    async def list_templates() -> list[ResourceTemplate]:
    
        """Return available resource templates."""
    
        return [
    
            ResourceTemplate(
    
                uriTemplate="db://users/{user_id}",
    
                name="User Profile",
    
                description="Fetch user profile by ID",
    
                mimeType="application/json"
    
            ),
    
            ResourceTemplate(
    
                uriTemplate="api://weather/{city}/{date}",
    
                name="Weather Data",
    
                description="Historical weather for city and date",
    
                mimeType="application/json"
    
            ),
    
            ResourceTemplate(
    
                uriTemplate="file://{path}",
    
                name="File Content",
    
                description="Read file at given path",
    
                mimeType="text/plain"
    
            )
    
        ]
    
    
    
    @app.read_resource()
    
    async def read_resource(uri: str) -> str:
    
        """Read resource, expanding template parameters."""
    
        
    
        # URI๋ฅผ ๊ตฌ๋ฌธ ๋ถ„์„ํ•˜์—ฌ ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค
    
        if uri.startswith("db://users/"):
    
            user_id = uri.split("/")[-1]
    
            return await fetch_user(user_id)
    
        
    
        elif uri.startswith("api://weather/"):
    
            parts = uri.replace("api://weather/", "").split("/")
    
            city, date = parts[0], parts[1]
    
            return await fetch_weather(city, date)
    
        
    
        elif uri.startswith("file://"):
    
            path = uri.replace("file://", "")
    
            return await read_file(path)
    
        
    
        raise ValueError(f"Unknown resource URI: {uri}")
    
    

    TypeScript ๊ตฌํ˜„

    
    server.setRequestHandler(ListResourceTemplatesSchema, async () => {
    
      return {
    
        resourceTemplates: [
    
          {
    
            uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",
    
            name: "GitHub Issue",
    
            description: "Fetch a specific GitHub issue",
    
            mimeType: "application/json"
    
          },
    
          {
    
            uriTemplate: "db://tables/{table}/rows/{id}",
    
            name: "Database Row",
    
            description: "Fetch a row from a database table",
    
            mimeType: "application/json"
    
          }
    
        ]
    
      };
    
    });
    
    
    
    server.setRequestHandler(ReadResourceSchema, async (request) => {
    
      const uri = request.params.uri;
    
      
    
      // GitHub ์ด์Šˆ URI ํŒŒ์‹ฑํ•˜๊ธฐ
    
      const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);
    
      if (githubMatch) {
    
        const [_, owner, repo, issueNumber] = githubMatch;
    
        const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));
    
        return {
    
          contents: [{
    
            uri,
    
            mimeType: "application/json",
    
            text: JSON.stringify(issue, null, 2)
    
          }]
    
        };
    
      }
    
      
    
      throw new Error(`Unknown resource URI: ${uri}`);
    
    });
    
    

    ---

    4. ์„œ๋ฒ„ ๋ผ์ดํ”„์‚ฌ์ดํด ์ด๋ฒคํŠธ

    ์ ์ ˆํ•œ ์ดˆ๊ธฐํ™” ๋ฐ ์ข…๋ฃŒ ์ฒ˜๋ฆฌ๋Š” ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ๋ฅผ ๊นจ๋—ํ•˜๊ฒŒ ์œ ์ง€ํ•ฉ๋‹ˆ๋‹ค.

    Python ๋ผ์ดํ”„์‚ฌ์ดํด ๊ด€๋ฆฌ

    
    from mcp.server import Server
    
    from contextlib import asynccontextmanager
    
    
    
    app = Server("lifecycle-server")
    
    
    
    # ๊ณต์œ  ์ƒํƒœ
    
    db_connection = None
    
    cache = None
    
    
    
    @asynccontextmanager
    
    async def lifespan(server: Server):
    
        """Manage server lifecycle."""
    
        global db_connection, cache
    
        
    
        # ์‹œ์ž‘
    
        print("๐Ÿš€ Server starting...")
    
        db_connection = await create_database_connection()
    
        cache = await create_cache_client()
    
        print("โœ… Resources initialized")
    
        
    
        yield  # ์„œ๋ฒ„๊ฐ€ ์—ฌ๊ธฐ์„œ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค
    
        
    
        # ์ข…๋ฃŒ
    
        print("๐Ÿ›‘ Server shutting down...")
    
        await db_connection.close()
    
        await cache.close()
    
        print("โœ… Resources cleaned up")
    
    
    
    app = Server("lifecycle-server", lifespan=lifespan)
    
    
    
    @app.tool()
    
    async def query_database(sql: str) -> str:
    
        """Use the shared database connection."""
    
        result = await db_connection.execute(sql)
    
        return str(result)
    
    

    TypeScript ๋ผ์ดํ”„์‚ฌ์ดํด

    
    import { Server } from "@modelcontextprotocol/sdk/server/index.js";
    
    
    
    class ManagedServer {
    
      private server: Server;
    
      private dbConnection: DatabaseConnection | null = null;
    
      
    
      constructor() {
    
        this.server = new Server({
    
          name: "lifecycle-server",
    
          version: "1.0.0"
    
        });
    
        
    
        this.setupHandlers();
    
      }
    
      
    
      async start() {
    
        // ๋ฆฌ์†Œ์Šค ์ดˆ๊ธฐํ™”
    
        console.log("๐Ÿš€ Server starting...");
    
        this.dbConnection = await createDatabaseConnection();
    
        console.log("โœ… Database connected");
    
        
    
        // ์„œ๋ฒ„ ์‹œ์ž‘
    
        await this.server.connect(transport);
    
      }
    
      
    
      async stop() {
    
        // ๋ฆฌ์†Œ์Šค ์ •๋ฆฌ
    
        console.log("๐Ÿ›‘ Server shutting down...");
    
        if (this.dbConnection) {
    
          await this.dbConnection.close();
    
        }
    
        await this.server.close();
    
        console.log("โœ… Cleanup complete");
    
      }
    
      
    
      private setupHandlers() {
    
        this.server.setRequestHandler(CallToolSchema, async (request) => {
    
          // this.dbConnection์„ ์•ˆ์ „ํ•˜๊ฒŒ ์‚ฌ์šฉ
    
          // ...
    
        });
    
      }
    
    }
    
    
    
    // ์ •์ƒ ์ข…๋ฃŒ์™€ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๊ธฐ
    
    const server = new ManagedServer();
    
    
    
    process.on('SIGINT', async () => {
    
      await server.stop();
    
      process.exit(0);
    
    });
    
    
    
    await server.start();
    
    

    ---

    5. ๋กœ๊น… ์ œ์–ด

    MCP๋Š” ํด๋ผ์ด์–ธํŠธ๊ฐ€ ์ œ์–ดํ•  ์ˆ˜ ์žˆ๋Š” ์„œ๋ฒ„ ์ธก ๋กœ๊น… ๋ ˆ๋ฒจ์„ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค.

    ๋กœ๊น… ๋ ˆ๋ฒจ ๊ตฌํ˜„

    
    from mcp.server import Server
    
    from mcp.types import LoggingLevel
    
    import logging
    
    
    
    app = Server("logging-server")
    
    
    
    # MCP ๋ ˆ๋ฒจ์„ Python ๋กœ๊น… ๋ ˆ๋ฒจ์— ๋งคํ•‘ํ•˜๊ธฐ
    
    LEVEL_MAP = {
    
        LoggingLevel.DEBUG: logging.DEBUG,
    
        LoggingLevel.INFO: logging.INFO,
    
        LoggingLevel.WARNING: logging.WARNING,
    
        LoggingLevel.ERROR: logging.ERROR,
    
    }
    
    
    
    logger = logging.getLogger("mcp-server")
    
    
    
    @app.set_logging_level()
    
    async def set_logging_level(level: LoggingLevel) -> None:
    
        """Handle client request to change logging level."""
    
        python_level = LEVEL_MAP.get(level, logging.INFO)
    
        logger.setLevel(python_level)
    
        logger.info(f"Logging level set to {level}")
    
    
    
    @app.tool()
    
    async def debug_operation(data: str) -> str:
    
        """Tool with various logging levels."""
    
        logger.debug(f"Processing data: {data}")
    
        
    
        try:
    
            result = process(data)
    
            logger.info(f"Successfully processed: {result}")
    
            return result
    
        except Exception as e:
    
            logger.error(f"Processing failed: {e}")
    
            raise
    
    

    ํด๋ผ์ด์–ธํŠธ๋กœ ๋กœ๊ทธ ๋ฉ”์‹œ์ง€ ์ „์†ก

    
    @app.tool()
    
    async def complex_operation(input: str, ctx) -> str:
    
        """Operation that logs to client."""
    
        
    
        # ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋กœ๊ทธ ์•Œ๋ฆผ ์ „์†ก
    
        await ctx.send_log(
    
            level="info",
    
            message=f"Starting complex operation with input: {input}"
    
        )
    
        
    
        # ์ž‘์—… ์ˆ˜ํ–‰ ์ค‘...
    
        result = await do_work(input)
    
        
    
        await ctx.send_log(
    
            level="debug",
    
            message=f"Operation complete, result size: {len(result)}"
    
        )
    
        
    
        return result
    
    

    ---

    6. ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ ํŒจํ„ด

    ์ผ๊ด€๋œ ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ๋Š” ๋””๋ฒ„๊น…๊ณผ ์‚ฌ์šฉ์ž ๊ฒฝํ—˜์„ ๊ฐœ์„ ํ•ฉ๋‹ˆ๋‹ค.

    MCP ์˜ค๋ฅ˜ ์ฝ”๋“œ

    
    from mcp.types import McpError, ErrorCode
    
    
    
    class ToolError(McpError):
    
        """Base class for tool errors."""
    
        pass
    
    
    
    class ValidationError(ToolError):
    
        """Invalid input parameters."""
    
        def __init__(self, message: str):
    
            super().__init__(ErrorCode.INVALID_PARAMS, message)
    
    
    
    class NotFoundError(ToolError):
    
        """Requested resource not found."""
    
        def __init__(self, resource: str):
    
            super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
    
    
    
    class PermissionError(ToolError):
    
        """Access denied."""
    
        def __init__(self, action: str):
    
            super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
    
    
    
    class InternalError(ToolError):
    
        """Internal server error."""
    
        def __init__(self, message: str):
    
            super().__init__(ErrorCode.INTERNAL_ERROR, message)
    
    

    ๊ตฌ์กฐํ™”๋œ ์˜ค๋ฅ˜ ์‘๋‹ต

    
    @app.tool()
    
    async def safe_operation(input: str) -> str:
    
        """Tool with comprehensive error handling."""
    
        
    
        # ์ž…๋ ฅ ๊ฐ’ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ
    
        if not input:
    
            raise ValidationError("Input cannot be empty")
    
        
    
        if len(input) > 10000:
    
            raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
    
        
    
        try:
    
            # ๊ถŒํ•œ ํ™•์ธ
    
            if not await check_permission(input):
    
                raise PermissionError(f"read {input}")
    
            
    
            # ์ž‘์—… ์ˆ˜ํ–‰
    
            result = await perform_operation(input)
    
            
    
            if result is None:
    
                raise NotFoundError(input)
    
            
    
            return result
    
            
    
        except ConnectionError as e:
    
            raise InternalError(f"Database connection failed: {e}")
    
        except TimeoutError as e:
    
            raise InternalError(f"Operation timed out: {e}")
    
        except Exception as e:
    
            # ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ์˜ค๋ฅ˜ ๊ธฐ๋ก
    
            logger.exception(f"Unexpected error in safe_operation")
    
            raise InternalError(f"Unexpected error: {type(e).__name__}")
    
    

    TypeScript์˜ ์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ

    
    import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
    
    
    
    function validateInput(data: unknown): asserts data is ValidInput {
    
      if (typeof data !== "object" || data === null) {
    
        throw new McpError(
    
          ErrorCode.InvalidParams,
    
          "Input must be an object"
    
        );
    
      }
    
      // ๋” ๋งŽ์€ ๊ฒ€์ฆ...
    
    }
    
    
    
    server.setRequestHandler(CallToolSchema, async (request) => {
    
      try {
    
        validateInput(request.params.arguments);
    
        
    
        const result = await performOperation(request.params.arguments);
    
        
    
        return {
    
          content: [{ type: "text", text: JSON.stringify(result) }]
    
        };
    
        
    
      } catch (error) {
    
        if (error instanceof McpError) {
    
          throw error;  // ์ด๋ฏธ MCP ์˜ค๋ฅ˜์ž…๋‹ˆ๋‹ค
    
        }
    
        
    
        // ๋‹ค๋ฅธ ์˜ค๋ฅ˜ ๋ณ€ํ™˜
    
        if (error instanceof NotFoundError) {
    
          throw new McpError(ErrorCode.InvalidRequest, error.message);
    
        }
    
        
    
        // ์•Œ ์ˆ˜ ์—†๋Š” ์˜ค๋ฅ˜
    
        console.error("Unexpected error:", error);
    
        throw new McpError(
    
          ErrorCode.InternalError,
    
          "An unexpected error occurred"
    
        );
    
      }
    
    });
    
    

    ---

    ์‹คํ—˜์  ๊ธฐ๋Šฅ (MCP 2025-11-25)

    ์ด๋Ÿฌํ•œ ๊ธฐ๋Šฅ์€ ๋ช…์„ธ์„œ์—์„œ ์‹คํ—˜์  ๊ธฐ๋Šฅ์œผ๋กœ ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค:

    ์ž‘์—… (์žฅ์‹œ๊ฐ„ ์‹คํ–‰ ์ž‘์—…)

    
    # ์ž‘์—…์€ ์ƒํƒœ๊ฐ€ ์žˆ๋Š” ์žฅ๊ธฐ ์‹คํ–‰ ์ž‘์—…์„ ์ถ”์ ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค๋‹ˆ๋‹ค
    
    @app.task()
    
    async def training_task(model_id: str, data_path: str, ctx) -> str:
    
        """Long-running ML training task."""
    
        
    
        # ์ž‘์—… ์‹œ์ž‘ ๋ณด๊ณ 
    
        await ctx.report_status("running", "Initializing training...")
    
        
    
        # ํ›ˆ๋ จ ๋ฃจํ”„
    
        for epoch in range(100):
    
            await train_epoch(model_id, data_path, epoch)
    
            await ctx.report_status(
    
                "running",
    
                f"Training epoch {epoch + 1}/100",
    
                progress=epoch + 1,
    
                total=100
    
            )
    
        
    
        await ctx.report_status("completed", "Training finished")
    
        return f"Model {model_id} trained successfully"
    
    

    ๋„๊ตฌ ์ฃผ์„

    
    # ์ฃผ์„์€ ๋„๊ตฌ ๋™์ž‘์— ๋Œ€ํ•œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค
    
    @app.tool(
    
        annotations={
    
            "destructive": False,      # ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ •ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค
    
            "idempotent": True,        # ์žฌ์‹œ๋„ํ•ด๋„ ์•ˆ์ „ํ•ฉ๋‹ˆ๋‹ค
    
            "timeout_seconds": 30,     # ์˜ˆ์ƒ ์ตœ๋Œ€ ์†Œ์š” ์‹œ๊ฐ„
    
            "requires_approval": False # ์‚ฌ์šฉ์ž ์Šน์ธ ๋ถˆํ•„์š”
    
        }
    
    )
    
    async def safe_query(query: str) -> str:
    
        """A read-only database query tool."""
    
        return await execute_read_query(query)
    
    

    ---

    ๋‹ค์Œ ๋‹จ๊ณ„

  • ๋ชจ๋“ˆ 8 - ๋ชจ๋ฒ” ์‚ฌ๋ก€
  • 5.14 - ์ปจํ…์ŠคํŠธ ์—”์ง€๋‹ˆ์–ด๋ง
  • MCP ๋ช…์„ธ ๋ณ€๊ฒฝ ๋กœ๊ทธ
  • ---

    ์ถ”๊ฐ€ ์ž๋ฃŒ

  • MCP ๋ช…์„ธ 2025-11-25
  • JSON-RPC 2.0 ์˜ค๋ฅ˜ ์ฝ”๋“œ
  • Python SDK ์˜ˆ์ œ
  • TypeScript SDK ์˜ˆ์ œ
  • ---

    ๋ฉด์ฑ… ์กฐํ•ญ:

    ์ด ๋ฌธ์„œ๋Š” AI ๋ฒˆ์—ญ ์„œ๋น„์Šค Co-op Translator๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฒˆ์—ญ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

    ์ •ํ™•์„ฑ์„ ์œ„ํ•ด ๋…ธ๋ ฅํ•˜๊ณ  ์žˆ์œผ๋‚˜, ์ž๋™ ๋ฒˆ์—ญ์—๋Š” ์˜ค๋ฅ˜๋‚˜ ๋ถ€์ •ํ™•์„ฑ์ด ์žˆ์„ ์ˆ˜ ์žˆ์Œ์„ ์–‘์ง€ํ•ด ์ฃผ์‹œ๊ธฐ ๋ฐ”๋ž๋‹ˆ๋‹ค.

    ์›๋ฌธ์€ ํ•ด๋‹น ์–ธ์–ด์˜ ์›๋ณธ ๋ฌธ์„œ๊ฐ€ ๊ถŒ์œ„ ์žˆ๋Š” ์ถœ์ฒ˜๋กœ ๊ฐ„์ฃผ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    ์ค‘์š”ํ•œ ์ •๋ณด์˜ ๊ฒฝ์šฐ ์ „๋ฌธ ์ธ๊ฐ„ ๋ฒˆ์—ญ์„ ๊ถŒ์žฅํ•ฉ๋‹ˆ๋‹ค.

    ๋ณธ ๋ฒˆ์—ญ ์‚ฌ์šฉ์œผ๋กœ ์ธํ•œ ์˜คํ•ด๋‚˜ ์ž˜๋ชป๋œ ํ•ด์„์— ๋Œ€ํ•ด ๋‹น์‚ฌ๋Š” ์ฑ…์ž„์ง€์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

    MCP Academy — microsoft/mcp-for-beginners