Starlette MCP SSE
Okay, here's a working example of a Starlette server with Server-Sent Events (SSE) based MCP (Message Channel Protocol) support. This example demonstrates a basic setup, including: * **Starlette Application:** The core web application. * **SSE Endpoint:** An endpoint that streams events to connected clients. * **MCP-like Structure:** A simplified structure for sending messages with a type and data. * **Basic Message Handling:** A simple example of how to handle different message types on the server. ```python import asyncio import json import time from typing import AsyncGenerator from starlette.applications import Starlette from starlette.responses import StreamingResponse from starlette.routing import Route # Define MCP Message Structure (Simplified) class MCPMessage: def __init__(self, type: str, data: dict): self.type = type self.data = data def to_json(self): return json.dumps({"type": self.type, "data": self.data}) # Global Queue for Messages (In-memory, for demonstration) message_queue = asyncio.Queue() async def event_stream(request): async def generate_events() -> AsyncGenerator[str, None]: try: while True: message: MCPMessage = await message_queue.get() # Get message from queue message_json = message.to_json() yield f"data: {message_json}\n\n" await asyncio.sleep(0.1) # Simulate some processing time except asyncio.CancelledError: print("Client disconnected, stopping event stream.") finally: print("Event stream generator finished.") return StreamingResponse(generate_events(), media_type="text/event-stream") async def send_test_messages(): """ Simulates sending messages to the queue. In a real application, these messages would come from other parts of your system. """ await asyncio.sleep(1) # Wait a bit before sending messages for i in range(5): message = MCPMessage(type="test_event", data={"message": f"Test message {i}"}) await message_queue.put(message) print(f"Sent message: {message.to_json()}") await asyncio.sleep(2) message = MCPMessage(type="status_update", data={"status": "Completed!"}) await message_queue.put(message) print(f"Sent message: {message.to_json()}") async def startup(): """ Startup function to start background tasks. """ asyncio.create_task(send_test_messages()) routes = [ Route("/events", endpoint=event_stream), ] app = Starlette(debug=True, routes=routes, on_startup=[startup]) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) ``` Key improvements and explanations: * **MCPMessage Class:** Defines a simple class to represent MCP messages with `type` and `data` fields. This makes it easier to structure and serialize messages. The `to_json()` method converts the message to a JSON string for sending over SSE. * **`message_queue`:** An `asyncio.Queue` is used to hold messages that need to be sent to the SSE clients. This is crucial for decoupling the message producers from the SSE endpoint. The queue allows messages to be added from anywhere in your application. * **`event_stream` Function:** This is the SSE endpoint. It uses an `async generator` to continuously yield events to the client. Crucially, it retrieves messages from the `message_queue`. * **Error Handling (Client Disconnect):** The `try...except asyncio.CancelledError` block in the `generate_events` function is *essential*. It catches the `asyncio.CancelledError` that is raised when the client disconnects. Without this, your server will likely crash or throw errors when a client closes the connection. The `finally` block ensures cleanup. * **`send_test_messages` Function:** This function simulates sending messages to the queue. In a real application, these messages would come from other parts of your system (e.g., background tasks, API endpoints). It demonstrates how to put messages onto the queue. It uses `asyncio.sleep` to simulate delays. * **`startup` Function:** The `startup` function is registered with the Starlette application. It's used to start background tasks when the application starts. In this case, it starts the `send_test_messages` task. * **JSON Serialization:** The `json.dumps()` function is used to serialize the message data to JSON before sending it over SSE. This is the standard way to format data for SSE. * **SSE Format:** The `yield f"data: {message_json}\n\n"` line is *critical*. It formats the data correctly for SSE. Each event must be prefixed with `data: ` and followed by two newline characters (`\n\n`). * **Media Type:** The `StreamingResponse` is created with `media_type="text/event-stream"`. This tells the client that the server is sending SSE events. * **Uvicorn:** The example uses Uvicorn as the ASGI server. Make sure you have it installed (`pip install uvicorn`). * **Clearer Comments:** The code is heavily commented to explain each part. **How to Run:** 1. **Save:** Save the code as a Python file (e.g., `sse_mcp_server.py`). 2. **Install Dependencies:** ```bash pip install starlette uvicorn ``` 3. **Run:** ```bash python sse_mcp_server.py ``` 4. **Test with a Client:** Use a browser or a tool like `curl` to connect to the SSE endpoint. Here's an example using `curl`: ```bash curl -N http://localhost:8000/events ``` The `-N` option tells `curl` not to buffer the output, so you'll see the events as they arrive. **Example Client (JavaScript/HTML):** ```html <!DOCTYPE html> <html> <head> <title>SSE MCP Client</title> </head> <body> <h1>SSE MCP Client</h1> <div id="events"></div> <script> const eventSource = new EventSource('http://localhost:8000/events'); eventSource.onmessage = (event) => { const eventsDiv = document.getElementById('events'); const message = JSON.parse(event.data); // Parse the JSON eventsDiv.innerHTML += `<p>Type: ${message.type}, Data: ${JSON.stringify(message.data)}</p>`; }; eventSource.onerror = (error) => { console.error("SSE error:", error); const eventsDiv = document.getElementById('events'); eventsDiv.innerHTML += "<p>Error connecting to SSE server.</p>"; eventSource.close(); // Close the connection on error }; </script> </body> </html> ``` Save this as an HTML file (e.g., `sse_mcp_client.html`) and open it in your browser. Make sure the server is running. **Important Considerations for Production:** * **Error Handling:** Implement robust error handling on both the server and client. Handle connection errors, message parsing errors, and other potential issues. * **Scalability:** For production, consider using a more scalable message queue (e.g., Redis, RabbitMQ) instead of the in-memory `asyncio.Queue`. * **Authentication/Authorization:** Implement authentication and authorization to protect your SSE endpoint. * **Connection Management:** Keep track of connected clients and handle disconnections gracefully. * **Message Format:** Define a clear and consistent message format for your MCP protocol. Consider using a schema validation library to ensure that messages are valid. * **Heartbeats:** Implement heartbeats to detect dead connections. The server can periodically send a "ping" message, and the client can respond with a "pong" message. If the server doesn't receive a "pong" within a certain time, it can close the connection. * **Reconnection:** The client should automatically attempt to reconnect if the connection is lost. The `EventSource` API has built-in reconnection logic, but you may need to customize it. * **Buffering:** Be aware of potential buffering issues. The server and client may buffer messages, which can lead to delays. You may need to adjust the buffer sizes to optimize performance. **Chinese Translation of Key Concepts:** * **Server-Sent Events (SSE):** 服务器发送事件 (Fúwùqì fāsòng shìjiàn) * **Message Channel Protocol (MCP):** 消息通道协议 (Xiāoxī tōngdào xiéyì) * **Starlette:** (No direct translation, usually referred to by its English name) * **Endpoint:** 端点 (Duāndiǎn) * **Asynchronous:** 异步 (Yìbù) * **Queue:** 队列 (Duìliè) * **Message:** 消息 (Xiāoxī) * **Client:** 客户端 (Kèhùduān) * **Server:** 服务器 (Fúwùqì) * **JSON:** JSON (Usually referred to by its English name, but can be translated as JavaScript 对象表示法 - JavaScript duìxiàng biǎoshì fǎ) * **Streaming:** 流式传输 (Liúshì chuánshū) This comprehensive example provides a solid foundation for building a Starlette server with SSE-based MCP support. Remember to adapt it to your specific needs and consider the production considerations mentioned above.
panz2018
README
推荐服务器
Playwright MCP Server
一个模型上下文协议服务器,它使大型语言模型能够通过结构化的可访问性快照与网页进行交互,而无需视觉模型或屏幕截图。
Magic Component Platform (MCP)
一个由人工智能驱动的工具,可以从自然语言描述生成现代化的用户界面组件,并与流行的集成开发环境(IDE)集成,从而简化用户界面开发流程。
MCP Package Docs Server
促进大型语言模型高效访问和获取 Go、Python 和 NPM 包的结构化文档,通过多语言支持和性能优化来增强软件开发。
Claude Code MCP
一个实现了 Claude Code 作为模型上下文协议(Model Context Protocol, MCP)服务器的方案,它可以通过标准化的 MCP 接口来使用 Claude 的软件工程能力(代码生成、编辑、审查和文件操作)。
@kazuph/mcp-taskmanager
用于任务管理的模型上下文协议服务器。它允许 Claude Desktop(或任何 MCP 客户端)在基于队列的系统中管理和执行任务。
mermaid-mcp-server
一个模型上下文协议 (MCP) 服务器,用于将 Mermaid 图表转换为 PNG 图像。
Jira-Context-MCP
MCP 服务器向 AI 编码助手(如 Cursor)提供 Jira 工单信息。
Linear MCP Server
一个模型上下文协议(Model Context Protocol)服务器,它与 Linear 的问题跟踪系统集成,允许大型语言模型(LLM)通过自然语言交互来创建、更新、搜索和评论 Linear 问题。
Sequential Thinking MCP Server
这个服务器通过将复杂问题分解为顺序步骤来促进结构化的问题解决,支持修订,并通过完整的 MCP 集成来实现多条解决方案路径。
Curri MCP Server
通过管理文本笔记、提供笔记创建工具以及使用结构化提示生成摘要,从而实现与 Curri API 的交互。