This chapter provides a comprehensive guide to implementing secure, scalable, and real-time streaming with the Model Context Protocol (MCP) using HTTPS.
It covers the motivation for streaming, the available transport mechanisms, how to implement streamable HTTP in MCP, security best practices, migration from SSE, and practical guidance for building your own streaming MCP applications.
This section explores the different transport mechanisms available in MCP and their role in enabling streaming capabilities for real-time communication between clients and servers.
A transport mechanism defines how data is exchanged between the client and server. MCP supports multiple transport types to suit different environments and requirements:
Have a look at the comparison table below to understand the differences between these transport mechanisms:
| Transport | Real-time Updates | Streaming | Scalability | Use Case |
|-------------------|------------------|-----------|-------------|-------------------------|
| stdio | No | No | Low | Local CLI tools |
| SSE | Yes | Yes | Medium | Web, real-time updates |
| Streamable HTTP | Yes | Yes | High | Cloud, multi-client |
> Tip: Choosing the right transport impacts performance, scalability, and user experience. Streamable HTTP is recommended for modern, scalable, and cloud-ready applications.
Note the transports stdio and SSE that you were shown in the previous chapters and how streamable HTTP is the transport covered in this chapter.
Understanding the fundamental concepts and motivations behind streaming is essential for implementing effective real-time communication systems.
Streaming is a technique in network programming that allows data to be sent and received in small, manageable chunks or as a sequence of events, rather than waiting for an entire response to be ready. This is especially useful for:
Here's what you need to know about streaming at high level:
The reasons for using streaming are the following:
Here's a simple example of how streaming can be implemented:
Server (Python, using FastAPI and StreamingResponse):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
async def event_stream():
for i in range(1, 6):
yield f"data: Message {i}\n\n"
time.sleep(1)
@app.get("/stream")
def stream():
return StreamingResponse(event_stream(), media_type="text/event-stream")
Client (Python, using requests):
import requests
with requests.get("http://localhost:8000/stream", stream=True) as r:
for line in r.iter_lines():
if line:
print(line.decode())
This example demonstrates a server sending a series of messages to the client as they become available, rather than waiting for all messages to be ready.
How it works:
Requirements:
StreamingResponse in FastAPI).stream=True in requests).text/event-stream or application/octet-stream.Server (Java, using Spring Boot and Server-Sent Events):
@RestController
public class CalculatorController {
@GetMapping(value = "/calculate", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> calculate(@RequestParam double a,
@RequestParam double b,
@RequestParam String op) {
double result;
switch (op) {
case "add": result = a + b; break;
case "sub": result = a - b; break;
case "mul": result = a * b; break;
case "div": result = b != 0 ? a / b : Double.NaN; break;
default: result = Double.NaN;
}
return Flux.<ServerSentEvent<String>>just(
ServerSentEvent.<String>builder()
.event("info")
.data("Calculating: " + a + " " + op + " " + b)
.build(),
ServerSentEvent.<String>builder()
.event("result")
.data(String.valueOf(result))
.build()
)
.delayElements(Duration.ofSeconds(1));
}
}
Client (Java, using Spring WebFlux WebClient):
@SpringBootApplication
public class CalculatorClientApplication implements CommandLineRunner {
private final WebClient client = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@Override
public void run(String... args) {
client.get()
.uri(uriBuilder -> uriBuilder
.path("/calculate")
.queryParam("a", 7)
.queryParam("b", 5)
.queryParam("op", "mul")
.build())
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.doOnNext(System.out::println)
.blockLast();
}
}
Java Implementation Notes:
Flux for streamingServerSentEvent provides structured event streaming with event typesWebClient with bodyToFlux() enables reactive streaming consumptiondelayElements() simulates processing time between eventsinfo, result) for better client handlingThe differences between how streaming works in a "classical" manner versus how it works in MCP can be depicted like so:
| Feature | Classic HTTP Streaming | MCP Streaming (Notifications) |
|------------------------|-------------------------------|-------------------------------------|
| Main response | Chunked | Single, at end |
| Progress updates | Sent as data chunks | Sent as notifications |
| Client requirements | Must process stream | Must implement message handler |
| Use case | Large files, AI token streams | Progress, logs, real-time feedback |
Additionally, here are some key differences:
- Classic HTTP streaming: Uses simple chunked transfer encoding to send data in chunks
- MCP streaming: Uses a structured notification system with JSON-RPC protocol
- Classic HTTP: Plain text chunks with newlines
- MCP: Structured LoggingMessageNotification objects with metadata
- Classic HTTP: Simple client that processes streaming responses
- MCP: More sophisticated client with a message handler to process different types of messages
- Classic HTTP: The progress is part of the main response stream
- MCP: Progress is sent via separate notification messages while the main response comes at the end
There are some things we recommend when it comes to choosing between implementing classical streaming (as an endpoint we showed you above using /stream) versus choosing streaming via MCP.
Ok, so you've seen some recommendations and comparisons so far on the difference between classical streaming and streaming in MCP. Let's get into detail exactly how you can leverage streaming in MCP.
Understanding how streaming works within the MCP framework is essential for building responsive applications that provide real-time feedback to users during long-running operations.
In MCP, streaming is not about sending the main response in chunks, but about sending notifications to the client while a tool is processing a request. These notifications can include progress updates, logs, or other events.
The main result is still sent as a single response. However, notifications can be sent as separate messages during processing and thereby update the client in real time. The client must be able to handle and display these notifications.
We said "Notification", what does that mean in the context of MCP?
A notification is a message sent from the server to the client to inform about progress, status, or other events during a long-running operation. Notifications improve transparency and user experience.
For example, a client is supposed to send a notification once the initial handshake with the server has been made.
A notification looks like so as a JSON message:
{
jsonrpc: "2.0";
method: string;
params?: {
[key: string]: unknown;
};
}
Notifications belongs to a topic in MCP referred to as "Logging".
To get logging to work, the server needs to enable it as feature/capability like so:
{
"capabilities": {
"logging": {}
}
}
> [!NOTE]
> Depending on the SDK used, logging might be enabled by default, or you might need to explicitly enable it in your server configuration.
There different types of notifications:
| Level | Description | Example Use Case |
|-----------|-------------------------------|---------------------------------|
| debug | Detailed debugging information | Function entry/exit points |
| info | General informational messages | Operation progress updates |
| notice | Normal but significant events | Configuration changes |
| warning | Warning conditions | Deprecated feature usage |
| error | Error conditions | Operation failures |
| critical | Critical conditions | System component failures |
| alert | Action must be taken immediately | Data corruption detected |
| emergency | System is unusable | Complete system failure |
To implement notifications in MCP, you need to set up both the server and client sides to handle real-time updates. This allows your application to provide immediate feedback to users during long-running operations.
Let's start with the server side.
In MCP, you define tools that can send notifications while processing requests.
The server uses the context object (usually ctx) to send messages to the client.
@mcp.tool(description="A tool that sends progress notifications")
async def process_files(message: str, ctx: Context) -> TextContent:
await ctx.info("Processing file 1/3...")
await ctx.info("Processing file 2/3...")
await ctx.info("Processing file 3/3...")
return TextContent(type="text", text=f"Done: {message}")
In the preceding example, the process_files tool sends three notifications to the client as it processes each file.
The ctx.info() method is used to send informational messages.
Additionally, to enable notifications, ensure your server uses a streaming transport (like streamable-http) and your client implements a message handler to process notifications.
Here's how you can set up the server to use the streamable-http transport:
mcp.run(transport="streamable-http")
[Tool("A tool that sends progress notifications")]
public async Task<TextContent> ProcessFiles(string message, ToolContext ctx)
{
await ctx.Info("Processing file 1/3...");
await ctx.Info("Processing file 2/3...");
await ctx.Info("Processing file 3/3...");
return new TextContent
{
Type = "text",
Text = $"Done: {message}"
};
}
In this .NET example, the ProcessFiles tool is decorated with the Tool attribute and sends three notifications to the client as it processes each file.
The ctx.Info() method is used to send informational messages.
To enable notifications in your .NET MCP server, ensure you're using a streaming transport:
var builder = McpBuilder.Create();
await builder
.UseStreamableHttp() // Enable streamable HTTP transport
.Build()
.RunAsync();
The client must implement a message handler to process and display notifications as they arrive.
async def message_handler(message):
if isinstance(message, types.ServerNotification):
print("NOTIFICATION:", message)
else:
print("SERVER MESSAGE:", message)
async with ClientSession(
read_stream,
write_stream,
logging_callback=logging_collector,
message_handler=message_handler,
) as session:
In the preceding code, the message_handler function checks if the incoming message is a notification.
If it is, it prints the notification; otherwise, it processes it as a regular server message.
Also note how the ClientSession is initialized with the message_handler to handle incoming notifications.
// Define a message handler
void MessageHandler(IJsonRpcMessage message)
{
if (message is ServerNotification notification)
{
Console.WriteLine($"NOTIFICATION: {notification}");
}
else
{
Console.WriteLine($"SERVER MESSAGE: {message}");
}
}
// Create and use a client session with the message handler
var clientOptions = new ClientSessionOptions
{
MessageHandler = MessageHandler,
LoggingCallback = (level, message) => Console.WriteLine($"[{level}] {message}")
};
using var client = new ClientSession(readStream, writeStream, clientOptions);
await client.InitializeAsync();
// Now the client will process notifications through the MessageHandler
In this .NET example, the MessageHandler function checks if the incoming message is a notification.
If it is, it prints the notification; otherwise, it processes it as a regular server message.
The ClientSession is initialized with the message handler via the ClientSessionOptions.
To enable notifications, ensure your server uses a streaming transport (like streamable-http) and your client implements a message handler to process notifications.
This section explains the concept of progress notifications in MCP, why they matter, and how to implement them using Streamable HTTP. You'll also find a practical assignment to reinforce your understanding.
Progress notifications are real-time messages sent from the server to the client during long-running operations.
Instead of waiting for the entire process to finish, the server keeps the client updated about the current status.
This improves transparency, user experience, and makes debugging easier.
Example:
"Processing document 1/10"
"Processing document 2/10"
...
"Processing complete!"
Progress notifications are essential for several reasons:
Here's how you can implement progress notifications in MCP:
ctx.info() or ctx.log() to send notifications as each item is processed. This sends a message to the client before the main result is ready.Server Example:
@mcp.tool(description="A tool that sends progress notifications")
async def process_files(message: str, ctx: Context) -> TextContent:
for i in range(1, 11):
await ctx.info(f"Processing document {i}/10")
await ctx.info("Processing complete!")
return TextContent(type="text", text=f"Done: {message}")
Client Example:
async def message_handler(message):
if isinstance(message, types.ServerNotification):
print("NOTIFICATION:", message)
else:
print("SERVER MESSAGE:", message)
When implementing MCP servers with HTTP-based transports, security becomes a paramount concern that requires careful attention to multiple attack vectors and protection mechanisms.
Security is critical when exposing MCP servers over HTTP. Streamable HTTP introduces new attack surfaces and requires careful configuration.
Origin header to prevent DNS rebinding attacks.localhost to avoid exposing them to the public internet.For applications currently using Server-Sent Events (SSE), migrating to Streamable HTTP provides enhanced capabilities and better long-term sustainability for your MCP implementations.
There are two compelling reasons to upgrade from SSE to Streamable HTTP:
Here's how you can migrate from SSE to Streamable HTTP in your MCP applications:
transport="streamable-http" in mcp.run().streamablehttp_client instead of SSE client.It's recommended to maintain compatibility with existing SSE clients during the migration process. Here are some strategies:
Ensure you address the following challenges during migration:
Security should be a top priority when implementing any server, especially when using HTTP-based transports like Streamable HTTP in MCP.
When implementing MCP servers with HTTP-based transports, security becomes a paramount concern that requires careful attention to multiple attack vectors and protection mechanisms.
Security is critical when exposing MCP servers over HTTP. Streamable HTTP introduces new attack surfaces and requires careful configuration.
Here are some key security considerations:
Origin header to prevent DNS rebinding attacks.localhost to avoid exposing them to the public internet.Additionally, here are some best practices to follow when implementing security in your MCP streaming server:
You will face some challenges when implementing security in MCP streaming servers:
Scenario:
Build an MCP server and client where the server processes a list of items (e.g., files or documents) and sends a notification for each item processed. The client should display each notification as it arrives.
Steps:
1. Implement a server tool that processes a list and sends notifications for each item.
2. Implement a client with a message handler to display notifications in real time.
3. Test your implementation by running both server and client, and observe the notifications.
To continue your journey with MCP streaming and expand your knowledge, this section provides additional resources and suggested next steps for building more advanced applications.
이 장에서는 HTTPS를 사용하여 모델 컨텍스트 프로토콜(MCP)로 보안성, 확장성 및 실시간 스트리밍을 구현하는 포괄적인 가이드를 제공합니다. 스트리밍의 동기, 사용 가능한 전송 메커니즘, MCP에서 스트림 가능한 HTTP를 구현하는 방법, 보안 모범 사례, SSE에서의 마이그레이션, 그리고 스트리밍 MCP 애플리케이션을 구축하는 실용적인 지침을 다룹니다.
이 섹션에서는 MCP에서 사용할 수 있는 다양한 전송 메커니즘과 이들이 클라이언트와 서버 간 실시간 통신을 가능하게 하는 스트리밍 기능에서 어떤 역할을 하는지 살펴봅니다.
전송 메커니즘은 클라이언트와 서버 간 데이터가 교환되는 방식을 정의합니다. MCP는 다양한 환경과 요구 사항에 맞춰 여러 전송 유형을 지원합니다:
아래 비교 표를 보면 이러한 전송 메커니즘 간의 차이점을 이해할 수 있습니다:
| 전송 타입 | 실시간 업데이트 | 스트리밍 | 확장성 | 사용 사례 |
|-------------------|----------------|---------|------------|---------------------------|
| stdio | 아니오 | 아니오 | 낮음 | 로컬 CLI 도구 |
| SSE | 예 | 예 | 중간 | 웹, 실시간 업데이트 |
| Streamable HTTP | 예 | 예 | 높음 | 클라우드, 다중 클라이언트 |
> 팁: 적절한 전송 방식을 선택하는 것은 성능, 확장성, 사용자 경험에 중요한 영향을 미칩니다. Streamable HTTP는 최신의 확장 가능하고 클라우드 대응 애플리케이션에 권장됩니다.
이전 장에서 소개한 stdio 및 SSE 전송과 달리, 이 장에서는 Streamable HTTP 전송에 대해 다룹니다.
스트리밍의 기본 개념과 동기를 이해하는 것은 효과적인 실시간 통신 시스템을 구현하는 데 필수적입니다.
스트리밍은 네트워크 프로그래밍에서 전체 응답이 준비될 때까지 기다리지 않고 데이터가 작고 관리 가능한 청크로 또는 이벤트 시퀀스로 전송되고 수신될 수 있게 하는 기술입니다. 특히 다음과 같은 경우에 유용합니다:
스트리밍에 대해 알아야 할 주요 사항은 다음과 같습니다:
스트리밍 사용 이유는 다음과 같습니다:
다음은 스트리밍이 어떻게 구현될 수 있는지에 대한 간단한 예입니다:
서버 (Python, FastAPI 및 StreamingResponse 사용):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
async def event_stream():
for i in range(1, 6):
yield f"data: Message {i}\n\n"
time.sleep(1)
@app.get("/stream")
def stream():
return StreamingResponse(event_stream(), media_type="text/event-stream")
클라이언트 (Python, requests 사용):
import requests
with requests.get("http://localhost:8000/stream", stream=True) as r:
for line in r.iter_lines():
if line:
print(line.decode())
이 예는 모든 메시지가 준비될 때까지 기다리지 않고, 서버가 준비되는 대로 일련의 메시지를 클라이언트에 전송하는 방식을 보여줍니다.
작동 방식:
요구 사항:
StreamingResponse)을 사용해야 합니다.requests의 stream=True).text/event-stream 또는 application/octet-stream입니다.서버 (Java, Spring Boot 및 Server-Sent Events 사용):
@RestController
public class CalculatorController {
@GetMapping(value = "/calculate", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> calculate(@RequestParam double a,
@RequestParam double b,
@RequestParam String op) {
double result;
switch (op) {
case "add": result = a + b; break;
case "sub": result = a - b; break;
case "mul": result = a * b; break;
case "div": result = b != 0 ? a / b : Double.NaN; break;
default: result = Double.NaN;
}
return Flux.<ServerSentEvent<String>>just(
ServerSentEvent.<String>builder()
.event("info")
.data("Calculating: " + a + " " + op + " " + b)
.build(),
ServerSentEvent.<String>builder()
.event("result")
.data(String.valueOf(result))
.build()
)
.delayElements(Duration.ofSeconds(1));
}
}
클라이언트 (Java, Spring WebFlux WebClient 사용):
@SpringBootApplication
public class CalculatorClientApplication implements CommandLineRunner {
private final WebClient client = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@Override
public void run(String... args) {
client.get()
.uri(uriBuilder -> uriBuilder
.path("/calculate")
.queryParam("a", 7)
.queryParam("b", 5)
.queryParam("op", "mul")
.build())
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.doOnNext(System.out::println)
.blockLast();
}
}
Java 구현 주석:
Flux)을 활용하여 스트리밍ServerSentEvent 는 이벤트 타입을 갖는 구조화된 이벤트 스트리밍 제공WebClient의 bodyToFlux()를 통해 반응형 스트리밍 소비 가능delayElements()로 이벤트 간 처리 시간 시뮬레이션info 및 result 타입 부여하여 클라이언트 처리 개선클래식 방식의 스트리밍과 MCP 스트리밍이 어떻게 다른지 다음 표에서 확인할 수 있습니다:
| 특징 | 클래식 HTTP 스트리밍 | MCP 스트리밍 (알림) |
|------------------------|-------------------------------|-------------------------------------|
| 주요 응답 | 청크 단위로 전송 | 단일 응답, 끝에 전달 |
| 진행 상황 업데이트 | 데이터 청크로 전송 | 알림으로 전송 |
| 클라이언트 요구 사항 | 스트림 처리 필요 | 메시지 핸들러 구현 필요 |
| 사용 사례 | 대용량 파일, AI 토큰 스트림 | 진행 상황, 로그, 실시간 피드백 |
추가로 주요 차이점은 다음과 같습니다:
- 클래식 HTTP 스트리밍: 간단한 청크 전송 인코딩 사용
- MCP 스트리밍: JSON-RPC 프로토콜 기반 구조화된 알림 시스템 사용
- 클래식 HTTP: 줄바꿈 포함한 텍스트 청크
- MCP: 메타데이터가 포함된 구조화된 LoggingMessageNotification 객체
- 클래식 HTTP: 스트리밍 응답을 처리하는 단순 클라이언트
- MCP: 메시지 핸들러를 구현하여 다양한 메시지 유형 처리
- 클래식 HTTP: 진행 상황이 주요 응답 스트림의 일부
- MCP: 주요 응답은 끝에 전송되고 진행 상황은 별도 알림 메시지로 전송
클래식 스트리밍(/stream 엔드포인트) 구현과 MCP 스트리밍 구현 간 선택 시 다음 사항들을 권장합니다.
이제까지 클래식 스트리밍과 MCP 스트리밍 차이에 대해 권장 사항과 비교를 보셨습니다. 이제 MCP에서 스트리밍을 어떻게 활용할 수 있는지 자세히 살펴봅시다.
MCP 프레임워크 내 스트리밍이 어떻게 작동하는지 이해하는 것은 장시간 실행 작업 중 사용자에게 실시간 피드백을 제공하는 반응형 애플리케이션을 구축하는 데 중요합니다.
MCP에서 스트리밍은 주요 응답을 청크로 나누어 보내는 것이 아니라, 툴이 요청을 처리하는 동안 알림(notification) 을 클라이언트로 전송하는 것입니다. 이 알림에는 진행 상황 업데이트, 로그, 기타 이벤트가 포함될 수 있습니다.
주요 결과는 여전히 단일 응답으로 전송됩니다. 그러나 처리 중에 알림이 별도의 메시지로 전송되어 클라이언트에 실시간으로 업데이트할 수 있습니다. 클라이언트는 이 알림을 수신하고 표시할 수 있어야 합니다.
"알림"이란 무엇인가라고 했는데, MCP 맥락에서 이를 정의하면?
알림은 서버에서 클라이언트로 보내는 메시지로서, 장시간 실행 작업 중 진행 상황, 상태 또는 기타 이벤트를 알리기 위해 전송됩니다. 알림은 투명성과 사용자 경험을 개선합니다.
예를 들어, 클라이언트는 서버와의 초기 핸드셰이크가 완료되면 알림을 보내야 합니다.
알림은 JSON 메시지 형태로 다음과 같습니다:
{
jsonrpc: "2.0";
method: string;
params?: {
[key: string]: unknown;
};
}
알림은 MCP 내 "Logging" 토픽에 속합니다.
로깅 기능을 작동시키려면 서버는 다음과 같이 기능/역량으로 활성화해야 합니다:
{
"capabilities": {
"logging": {}
}
}
> [!NOTE]
> 사용 중인 SDK에 따라 로깅이 기본적으로 활성화되어 있거나 서버 설정에서 명시적으로 활성화해야 할 수 있습니다.
알림에는 여러 유형이 있습니다:
| 레벨 | 설명 | 예제 사용 사례 |
|-----------|-------------------------------|--------------------------------|
| debug | 상세 디버깅 정보 | 함수 진입/종료 지점 |
| info | 일반 정보 메시지 | 작업 진행 상황 업데이트 |
| notice | 일반적이지만 중요한 이벤트 | 설정 변경 |
| warning | 경고 상황 | 사용 중단된 기능 사용 |
| error | 오류 상황 | 작업 실패 |
| critical | 치명적인 상황 | 시스템 구성 요소 실패 |
| alert | 즉각적 조치 필요 | 데이터 손상 탐지 |
| emergency | 시스템 사용 불가 | 전체 시스템 장애 |
MCP에서 알림을 구현하려면 서버와 클라이언트 양쪽에서 실시간 업데이트를 처리하도록 설정해야 합니다. 이를 통해 장시간 실행 작업 중 즉각적인 피드백을 사용자에게 제공할 수 있습니다.
먼저 서버 측부터 시작합시다. MCP에서는 요청 처리 중 알림을 보낼 수 있는 툴을 정의합니다. 서버는 컨텍스트 객체(보통 ctx)를 사용하여 클라이언트에 메시지를 전송합니다.
@mcp.tool(description="A tool that sends progress notifications")
async def process_files(message: str, ctx: Context) -> TextContent:
await ctx.info("Processing file 1/3...")
await ctx.info("Processing file 2/3...")
await ctx.info("Processing file 3/3...")
return TextContent(type="text", text=f"Done: {message}")
위 예제에서 process_files 툴은 각 파일을 처리할 때 클라이언트에 세 번의 알림을 보냅니다. ctx.info() 메서드는 정보 메시지 전송에 사용됩니다.
또한 알림을 활성화하려면 서버가 스트리밍 전송(streamable-http 등)을 사용하고 클라이언트가 알림을 처리할 메시지 핸들러를 구현해야 합니다.
서버에서 streamable-http 전송을 사용하는 방법은 다음과 같습니다:
mcp.run(transport="streamable-http")
[Tool("A tool that sends progress notifications")]
public async Task<TextContent> ProcessFiles(string message, ToolContext ctx)
{
await ctx.Info("Processing file 1/3...");
await ctx.Info("Processing file 2/3...");
await ctx.Info("Processing file 3/3...");
return new TextContent
{
Type = "text",
Text = $"Done: {message}"
};
}
이 .NET 예제에서 ProcessFiles 툴은 Tool 어트리뷰트가 적용되어 있으며 각 파일 처리 시 클라이언트에 세 번 알림을 보냅니다. ctx.Info() 메서드는 정보 메시지 전송에 이용됩니다.
.NET MCP 서버에서 알림을 활성화하려면 스트리밍 전송이 사용되고 있는지 확인하세요:
var builder = McpBuilder.Create();
await builder
.UseStreamableHttp() // Enable streamable HTTP transport
.Build()
.RunAsync();
클라이언트는 메시지 핸들러를 구현해야 하며, 도착하는 알림을 처리하고 표시할 수 있어야 합니다.
async def message_handler(message):
if isinstance(message, types.ServerNotification):
print("NOTIFICATION:", message)
else:
print("SERVER MESSAGE:", message)
async with ClientSession(
read_stream,
write_stream,
logging_callback=logging_collector,
message_handler=message_handler,
) as session:
위 코드에서 message_handler 함수는 들어오는 메시지가 알림인지 확인합니다.
알림이면 출력하며, 아니면 일반 서버 메시지로 처리합니다.
또한 ClientSession은 도착하는 알림을 처리하기 위해 message_handler로 초기화됩니다.
// Define a message handler
void MessageHandler(IJsonRpcMessage message)
{
if (message is ServerNotification notification)
{
Console.WriteLine($"NOTIFICATION: {notification}");
}
else
{
Console.WriteLine($"SERVER MESSAGE: {message}");
}
}
// Create and use a client session with the message handler
var clientOptions = new ClientSessionOptions
{
MessageHandler = MessageHandler,
LoggingCallback = (level, message) => Console.WriteLine($"[{level}] {message}")
};
using var client = new ClientSession(readStream, writeStream, clientOptions);
await client.InitializeAsync();
// Now the client will process notifications through the MessageHandler
이 .NET 예제에서 MessageHandler 함수는 들어오는 메시지가 알림인지 체크합니다.
알림이면 출력하고, 아니면 일반 서버 메시지로 처리합니다. ClientSession은 ClientSessionOptions를 통해 메시지 핸들러로 초기화됩니다.
알림 활성화를 위해 서버가 스트리밍 전송(streamable-http 등)을 사용하고 클라이언트가 알림 메시지 핸들러를 구현하는지 확인하세요.
이 섹션에서는 MCP에서 진행 알림의 개념과 중요성, 그리고 Streamable HTTP를 통해 이를 구현하는 방법을 설명합니다. 또한 이해를 돕기 위한 실용적인 과제도 포함되어 있습니다.
진행 알림(progress notifications)은 장시간 작업 중 서버에서 클라이언트로 실시간으로 보내는 메시지입니다. 전체 프로세스가 끝날 때까지 기다리지 않고 서버가 현재 상태를 계속해서 알립니다. 이는 투명성, 사용자 경험을 높이고 디버깅도 용이하게 합니다.
예시:
"Processing document 1/10"
"Processing document 2/10"
...
"Processing complete!"
진행 알림이 중요한 이유는 다음과 같습니다:
MCP에서 진행 알림을 구현하는 방법은 다음과 같습니다:
ctx.info() 또는 ctx.log()를 사용해 각 아이템 처리 시 알림을 전송. 이는 주요 결과가 준비되기 전에 클라이언트에 메시지를 보냅니다.서버 예제:
@mcp.tool(description="A tool that sends progress notifications")
async def process_files(message: str, ctx: Context) -> TextContent:
for i in range(1, 11):
await ctx.info(f"Processing document {i}/10")
await ctx.info("Processing complete!")
return TextContent(type="text", text=f"Done: {message}")
클라이언트 예제:
async def message_handler(message):
if isinstance(message, types.ServerNotification):
print("NOTIFICATION:", message)
else:
print("SERVER MESSAGE:", message)
HTTP 기반 전송을 사용하는 MCP 서버를 구현할 때 보안은 다양한 공격 지점과 보호 메커니즘에 세심한 주의가 필요한 매우 중요한 문제입니다.
MCP 서버를 HTTP로 노출할 때 보안은 필수적입니다. Streamable HTTP는 새로운 공격 표면을 도입하여 신중한 구성이 요구됩니다.
Origin 헤더를 검증하세요.localhost에 바인딩하여 공개 인터넷 노출을 방지하세요.현재 Server-Sent Events(SSE)를 사용하는 애플리케이션의 경우, Streamable HTTP로 마이그레이션하면 향상된 기능과 MCP 구현의 장기적 유지 가능성을 제공합니다.
SSE에서 Streamable HTTP로 업그레이드해야 하는 두 가지 설득력 있는 이유가 있습니다:
MCP 애플리케이션에서 SSE에서 Streamable HTTP로 마이그레이션하는 방법은 다음과 같습니다:
mcp.run()에서 transport="streamable-http"로 업데이트합니다.streamablehttp_client를 사용하도록 업데이트합니다.마이그레이션 과정에서 기존 SSE 클라이언트와의 호환성을 유지하는 것이 권장됩니다. 다음과 같은 전략이 있습니다:
마이그레이션 시 다음 과제를 반드시 해결해야 합니다:
어떤 서버를 구현하든, 특히 MCP의 Streamable HTTP 같은 HTTP 기반 전송을 사용할 때 보안은 최우선 과제여야 합니다.
HTTP 기반 전송을 사용하는 MCP 서버를 구현할 때는 여러 공격 벡터와 보호 메커니즘에 세심한 주의를 기울여야 합니다.
MCP 서버를 HTTP를 통해 노출할 때 보안은 매우 중요합니다. Streamable HTTP는 새로운 공격 지점을 제공하며 신중한 구성과 관리가 필요합니다.
주요 보안 고려사항은 다음과 같습니다:
Origin 헤더를 검증합니다.localhost에 바인딩하여 공개 인터넷에 노출되지 않도록 합니다.MCP 스트리밍 서버에서 보안을 구현할 때 따를 모범 사례는 다음과 같습니다:
MCP 스트리밍 서버 보안 구현 중에는 다음과 같은 과제를 직면합니다:
시나리오:
서버가 아이템 목록(예: 파일 또는 문서)을 처리하고 처리된 각 아이템에 대해 알림을 전송하는 MCP 서버와 클라이언트를 만드세요. 클라이언트는 도착하는 각 알림을 실시간으로 표시해야 합니다.
단계:
1. 목록을 처리하고 각 항목에 대해 알림을 보내는 서버 도구를 구현하세요.
2. 알림을 실시간으로 표시할 메시지 핸들러가 있는 클라이언트를 구현하세요.
3. 서버와 클라이언트를 함께 실행하여 구현을 테스트하고 알림이 잘 표시되는지 확인하세요.
MCP 스트리밍을 계속 사용하고 지식을 확장하기 위해 이 섹션에서는 추가 리소스와 더 고급 애플리케이션을 만들기 위한 권장 다음 단계를 제공합니다.
---
면책 조항:
이 문서는 AI 번역 서비스 Co-op Translator를 사용하여 번역되었습니다.
정확성을 기하기 위해 노력하고 있으나, 자동 번역은 오류나 부정확한 부분이 있을 수 있음을 유의하시기 바랍니다.
원본 문서의 원어본이 권위 있는 자료로 간주되어야 합니다.
중요한 정보의 경우, 전문가의 인간 번역을 권장합니다.
이 번역 사용으로 인해 발생하는 오해나 잘못된 해석에 대해 당사는 책임을 지지 않습니다.