发现优秀的 MCP 服务器
通过 MCP 服务器扩展您的代理能力,拥有 13,799 个能力。
mcpx-py
Python 客户端库,用于...
AWS MCP Server
Remote MCP Server on Cloudflare
agentops-mcp-server
AgentOps SDK 的官方 MCP 服务器
mcp-figma
一个模型上下文协议服务器,提供对 Figma API 功能的访问,允许像 Claude 这样的 AI 助手与 Figma 文件、评论、组件和团队资源进行交互。
Twitter MCP Server
镜子 (jìng zi)
Remote MCP Server on Cloudflare
Mcp Server Flomo
Algolia Search MCP Server
Slack Admin MCP Server
用于 Slack 频道管理的 MCP 服务器(创建、重命名、存档)
Clarion Builder MCP Server
提供Clarion开发和构建功能的MCP服务器
mcp-server
MCP Ctl
一个跨平台管理所有 Minecraft 服务端的包管理器
Internetsearch-mcp-server
镜子 (jìng zi)
Boilerplate MCP Server
一个使用 TypeScript 完整实现的模型上下文协议 (MCP) 服务器,包含用于 LLM 集成的工具、资源和提示。 为构建 AI 驱动的应用程序提供强大的基础,并支持 CLI。
Add API key to .env file
Okay, here's a basic implementation of a simplified Master-Client-Processor (MCP) system in Python, focusing on clarity and simplicity. This example uses TCP sockets for communication and demonstrates a basic task distribution and result collection. **Important Considerations:** * **Error Handling:** This is a *very* basic example. Robust error handling (try-except blocks, connection timeouts, etc.) is crucial for a real-world system. * **Security:** This code does *not* include any security measures (authentication, encryption). Do *not* use this in a production environment without adding appropriate security. * **Serialization:** This example uses simple string-based communication. For more complex data, you'll need to use a serialization library like `pickle`, `json`, or `protobuf`. `pickle` is easy but has security risks if you're receiving data from untrusted sources. * **Threading/Asynchronous:** This example uses basic threading. For higher performance, consider using `asyncio` for asynchronous I/O. * **Scalability:** This is a simple example and won't scale well to a large number of clients or processors. Consider using message queues (e.g., RabbitMQ, Redis Pub/Sub) for a more scalable architecture. **Code:** ```python import socket import threading import time import random # --- Master (Server) --- class Master: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) # Listen for up to 5 connections self.processors = [] # List to store processor connections self.client_socket = None # Store the client connection self.task_queue = [] # Queue to store tasks self.results = {} # Dictionary to store results def start(self): print(f"Master listening on {self.host}:{self.port}") threading.Thread(target=self.accept_processors).start() threading.Thread(target=self.accept_client).start() def accept_processors(self): while True: try: processor_socket, processor_address = self.server_socket.accept() print(f"Processor connected from {processor_address}") self.processors.append(processor_socket) threading.Thread(target=self.handle_processor, args=(processor_socket,)).start() except Exception as e: print(f"Error accepting processor connection: {e}") break def accept_client(self): try: self.client_socket, client_address = self.server_socket.accept() print(f"Client connected from {client_address}") threading.Thread(target=self.handle_client, args=(self.client_socket,)).start() except Exception as e: print(f"Error accepting client connection: {e}") def handle_processor(self, processor_socket): try: while True: if self.task_queue: task_id, task_data = self.task_queue.pop(0) processor_socket.send(f"TASK:{task_id}:{task_data}".encode()) print(f"Sent task {task_id} to processor") result = processor_socket.recv(1024).decode() if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"Received result {result_id}: {result_data}") if self.client_socket: self.client_socket.send(f"RESULT:{result_id}:{result_data}".encode()) # Forward result to client else: print(f"Unexpected message from processor: {result}") else: time.sleep(1) # Check for tasks periodically except Exception as e: print(f"Processor disconnected or error: {e}") self.processors.remove(processor_socket) finally: processor_socket.close() def handle_client(self, client_socket): try: while True: message = client_socket.recv(1024).decode() if not message: break if message.startswith("TASK:"): _, task_data = message.split(":", 1) task_id = len(self.results) # Assign a unique ID self.task_queue.append((task_id, task_data)) print(f"Received task {task_id} from client: {task_data}") client_socket.send(f"TASK_ACCEPTED:{task_id}".encode()) # Acknowledge task elif message == "GET_RESULTS": client_socket.send(str(self.results).encode()) else: print(f"Unknown message from client: {message}") except Exception as e: print(f"Client disconnected or error: {e}") finally: client_socket.close() self.client_socket = None # Reset client socket # --- Processor (Worker) --- class Processor: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"Processor connected to {self.host}:{self.port}") self.process_tasks() except Exception as e: print(f"Error connecting to master: {e}") def process_tasks(self): try: while True: data = self.client_socket.recv(1024).decode() if not data: break if data.startswith("TASK:"): _, task_id, task_data = data.split(":", 2) print(f"Received task {task_id}: {task_data}") # Simulate processing (replace with actual work) time.sleep(random.uniform(0.5, 2)) # Simulate variable processing time result = f"Processed: {task_data.upper()}" self.client_socket.send(f"RESULT:{task_id}:{result}".encode()) print(f"Sent result {task_id}: {result}") else: print(f"Unknown message: {data}") except Exception as e: print(f"Connection lost or error: {e}") finally: self.client_socket.close() # --- Client --- class Client: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.results = {} def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"Connected to {self.host}:{self.port}") except Exception as e: print(f"Error connecting to master: {e}") return False return True def send_task(self, task_data): try: self.client_socket.send(f"TASK:{task_data}".encode()) response = self.client_socket.recv(1024).decode() if response.startswith("TASK_ACCEPTED:"): _, task_id = response.split(":") print(f"Task {task_id} accepted by master.") return int(task_id) else: print(f"Unexpected response: {response}") return None except Exception as e: print(f"Error sending task: {e}") return None def get_results(self): try: self.client_socket.send("GET_RESULTS".encode()) results_str = self.client_socket.recv(4096).decode() try: self.results = eval(results_str) # WARNING: eval() is dangerous with untrusted input print("Received results:", self.results) except: print("Could not decode results") except Exception as e: print(f"Error getting results: {e}") def receive_results(self): try: while True: result = self.client_socket.recv(1024).decode() if not result: break if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"Received result {result_id}: {result_data}") else: print(f"Unexpected message: {result}") except Exception as e: print(f"Error receiving results: {e}") finally: self.client_socket.close() def start_receiving(self): threading.Thread(target=self.receive_results).start() # --- Main --- if __name__ == "__main__": # Start the master master = Master() master.start() # Start some processors (adjust the number as needed) num_processors = 3 processors = [] for i in range(num_processors): processor = Processor() processors.append(threading.Thread(target=processor.connect)) processors[-1].start() time.sleep(0.1) # Small delay to avoid connection issues # Start the client client = Client() if client.connect(): client.start_receiving() # Start a thread to receive results asynchronously # Send some tasks task_ids = [] for i in range(5): task_id = client.send_task(f"Task {i}") if task_id is not None: task_ids.append(task_id) time.sleep(0.5) # Wait for a while to allow processing time.sleep(5) # Get the results (optional, if not using asynchronous result receiving) # client.get_results() # Keep the main thread alive for a while to allow everything to finish time.sleep(10) ``` **How to Run:** 1. **Save:** Save the code as a Python file (e.g., `mcp_system.py`). 2. **Run the Master:** `python mcp_system.py` (This will start the master server.) 3. **Run the Processors:** Open *multiple* terminal windows and run `python mcp_system.py` in each. Each instance will act as a processor and connect to the master. Adjust `num_processors` in the `if __name__ == "__main__":` block to control how many processors are started. 4. **Run the Client:** Open another terminal window and run `python mcp_system.py`. This will start the client, which will connect to the master and send tasks. **Explanation:** * **Master:** * Listens for connections from processors and a client. * Maintains a task queue. * Distributes tasks to available processors. * Collects results from processors. * Forwards results to the client. * **Processor:** * Connects to the master. * Receives tasks from the master. * Simulates processing the task (replace with your actual processing logic). * Sends the result back to the master. * **Client:** * Connects to the master. * Sends tasks to the master. * Receives results from the master. * Can request all results at once or receive them asynchronously as they become available. **Key Improvements and Explanations:** * **Clearer Task Handling:** The master now uses a `task_queue` to store tasks and assigns unique IDs to each task. This makes it easier to track tasks and results. * **Result Forwarding:** The master now forwards results received from processors directly to the client. This allows the client to receive results as they are processed, rather than having to request them all at the end. * **Asynchronous Result Receiving:** The client now uses a separate thread (`start_receiving`) to receive results from the master asynchronously. This prevents the client from blocking while waiting for results. * **Task Acknowledgement:** The master sends a `TASK_ACCEPTED` message to the client when a task is received. This provides feedback to the client that the task has been successfully submitted. * **Error Handling:** Basic `try...except` blocks are added to catch potential exceptions during socket operations and thread execution. This is still very basic, but it's a start. * **Comments:** More comments are added to explain the code. * **`eval()` Warning:** A warning is included about the use of `eval()` to parse the results string. `eval()` is dangerous if you're receiving data from untrusted sources. Use a safer serialization method like `json.loads()` if possible. * **Simulated Processing Time:** The processor now simulates variable processing time using `time.sleep(random.uniform(0.5, 2))`. This makes the example more realistic. * **Client Disconnect Handling:** The master now handles client disconnections more gracefully by setting `self.client_socket = None` when the client disconnects. This prevents errors if the master tries to send results to a disconnected client. **Next Steps:** 1. **Replace Simulated Processing:** Replace the `time.sleep()` in the `Processor.process_tasks()` method with your actual task processing logic. 2. **Implement Serialization:** Use `json`, `pickle`, or `protobuf` to serialize and deserialize data between the client, master, and processors. 3. **Add Error Handling:** Add more robust error handling to handle connection errors, data corruption, and other potential issues. 4. **Implement Security:** Add authentication and encryption to protect your system from unauthorized access. 5. **Consider Asynchronous I/O:** Use `asyncio` for asynchronous I/O to improve performance, especially if you have a large number of clients or processors. 6. **Use a Message Queue:** For a more scalable architecture, use a message queue like RabbitMQ or Redis Pub/Sub to decouple the client, master, and processors. This improved example provides a more complete and functional starting point for building your MCP system. Remember to address the security and scalability concerns before deploying this in a production environment. ```chinese 好的,这是一个用 Python 实现的简化版主-客户端-处理器 (MCP) 系统的基本示例,重点在于清晰和简洁。此示例使用 TCP 套接字进行通信,并演示了基本的任务分配和结果收集。 **重要注意事项:** * **错误处理:** 这是一个 *非常* 基础的示例。对于实际系统,健壮的错误处理(try-except 块、连接超时等)至关重要。 * **安全性:** 此代码 *不* 包含任何安全措施(身份验证、加密)。在添加适当的安全性之前,*不要* 在生产环境中使用它。 * **序列化:** 此示例使用简单的基于字符串的通信。对于更复杂的数据,您需要使用序列化库,例如 `pickle`、`json` 或 `protobuf`。 `pickle` 很容易使用,但如果您从不受信任的来源接收数据,则存在安全风险。 * **线程/异步:** 此示例使用基本的线程。为了获得更高的性能,请考虑使用 `asyncio` 进行异步 I/O。 * **可扩展性:** 这是一个简单的示例,无法很好地扩展到大量的客户端或处理器。考虑使用消息队列(例如,RabbitMQ、Redis Pub/Sub)来实现更具可扩展性的架构。 **代码:** ```python import socket import threading import time import random # --- 主服务器 (Master) --- class Master: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) # 监听最多 5 个连接 self.processors = [] # 存储处理器连接的列表 self.client_socket = None # 存储客户端连接 self.task_queue = [] # 存储任务的队列 self.results = {} # 存储结果的字典 def start(self): print(f"主服务器监听 {self.host}:{self.port}") threading.Thread(target=self.accept_processors).start() threading.Thread(target=self.accept_client).start() def accept_processors(self): while True: try: processor_socket, processor_address = self.server_socket.accept() print(f"处理器从 {processor_address} 连接") self.processors.append(processor_socket) threading.Thread(target=self.handle_processor, args=(processor_socket,)).start() except Exception as e: print(f"接受处理器连接时出错:{e}") break def accept_client(self): try: self.client_socket, client_address = self.server_socket.accept() print(f"客户端从 {client_address} 连接") threading.Thread(target=self.handle_client, args=(self.client_socket,)).start() except Exception as e: print(f"接受客户端连接时出错:{e}") def handle_processor(self, processor_socket): try: while True: if self.task_queue: task_id, task_data = self.task_queue.pop(0) processor_socket.send(f"TASK:{task_id}:{task_data}".encode()) print(f"已将任务 {task_id} 发送到处理器") result = processor_socket.recv(1024).decode() if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"收到结果 {result_id}: {result_data}") if self.client_socket: self.client_socket.send(f"RESULT:{result_id}:{result_data}".encode()) # 将结果转发到客户端 else: print(f"来自处理器的意外消息:{result}") else: time.sleep(1) # 定期检查任务 except Exception as e: print(f"处理器断开连接或出错:{e}") self.processors.remove(processor_socket) finally: processor_socket.close() def handle_client(self, client_socket): try: while True: message = client_socket.recv(1024).decode() if not message: break if message.startswith("TASK:"): _, task_data = message.split(":", 1) task_id = len(self.results) # 分配一个唯一的 ID self.task_queue.append((task_id, task_data)) print(f"收到来自客户端的任务 {task_id}: {task_data}") client_socket.send(f"TASK_ACCEPTED:{task_id}".encode()) # 确认任务 elif message == "GET_RESULTS": client_socket.send(str(self.results).encode()) else: print(f"来自客户端的未知消息:{message}") except Exception as e: print(f"客户端断开连接或出错:{e}") finally: client_socket.close() self.client_socket = None # 重置客户端套接字 # --- 处理器 (Worker) --- class Processor: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"处理器连接到 {self.host}:{self.port}") self.process_tasks() except Exception as e: print(f"连接到主服务器时出错:{e}") def process_tasks(self): try: while True: data = self.client_socket.recv(1024).decode() if not data: break if data.startswith("TASK:"): _, task_id, task_data = data.split(":", 2) print(f"收到任务 {task_id}: {task_data}") # 模拟处理(替换为实际工作) time.sleep(random.uniform(0.5, 2)) # 模拟可变处理时间 result = f"已处理: {task_data.upper()}" self.client_socket.send(f"RESULT:{task_id}:{result}".encode()) print(f"已发送结果 {task_id}: {result}") else: print(f"未知消息:{data}") except Exception as e: print(f"连接丢失或出错:{e}") finally: self.client_socket.close() # --- 客户端 --- class Client: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.results = {} def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"连接到 {self.host}:{self.port}") except Exception as e: print(f"连接到主服务器时出错:{e}") return False return True def send_task(self, task_data): try: self.client_socket.send(f"TASK:{task_data}".encode()) response = self.client_socket.recv(1024).decode() if response.startswith("TASK_ACCEPTED:"): _, task_id = response.split(":") print(f"任务 {task_id} 已被主服务器接受。") return int(task_id) else: print(f"意外响应:{response}") return None except Exception as e: print(f"发送任务时出错:{e}") return None def get_results(self): try: self.client_socket.send("GET_RESULTS".encode()) results_str = self.client_socket.recv(4096).decode() try: self.results = eval(results_str) # 警告:eval() 对于不受信任的输入是危险的 print("收到结果:", self.results) except: print("无法解码结果") except Exception as e: print(f"获取结果时出错:{e}") def receive_results(self): try: while True: result = self.client_socket.recv(1024).decode() if not result: break if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"收到结果 {result_id}: {result_data}") else: print(f"意外消息:{result}") except Exception as e: print(f"接收结果时出错:{e}") finally: self.client_socket.close() def start_receiving(self): threading.Thread(target=self.receive_results).start() # --- 主函数 --- if __name__ == "__main__": # 启动主服务器 master = Master() master.start() # 启动一些处理器(根据需要调整数量) num_processors = 3 processors = [] for i in range(num_processors): processor = Processor() processors.append(threading.Thread(target=processor.connect)) processors[-1].start() time.sleep(0.1) # 稍微延迟以避免连接问题 # 启动客户端 client = Client() if client.connect(): client.start_receiving() # 启动一个线程以异步接收结果 # 发送一些任务 task_ids = [] for i in range(5): task_id = client.send_task(f"任务 {i}") if task_id is not None: task_ids.append(task_id) time.sleep(0.5) # 等待一段时间以允许处理 time.sleep(5) # 获取结果(可选,如果未使用异步结果接收) # client.get_results() # 保持主线程活动一段时间以允许一切完成 time.sleep(10) ``` **如何运行:** 1. **保存:** 将代码保存为 Python 文件(例如,`mcp_system.py`)。 2. **运行主服务器:** `python mcp_system.py`(这将启动主服务器。) 3. **运行处理器:** 打开 *多个* 终端窗口并在每个窗口中运行 `python mcp_system.py`。每个实例将充当处理器并连接到主服务器。调整 `if __name__ == "__main__":` 块中的 `num_processors` 以控制启动的处理器数量。 4. **运行客户端:** 打开另一个终端窗口并运行 `python mcp_system.py`。这将启动客户端,客户端将连接到主服务器并发送任务。 **说明:** * **主服务器:** * 监听来自处理器和客户端的连接。 * 维护任务队列。 * 将任务分配给可用的处理器。 * 从处理器收集结果。 * 将结果转发到客户端。 * **处理器:** * 连接到主服务器。 * 从主服务器接收任务。 * 模拟处理任务(替换为您的实际处理逻辑)。 * 将结果发送回主服务器。 * **客户端:** * 连接到主服务器。 * 将任务发送到主服务器。 * 从主服务器接收结果。 * 可以一次请求所有结果,也可以在结果可用时异步接收它们。 **主要改进和说明:** * **更清晰的任务处理:** 主服务器现在使用 `task_queue` 来存储任务,并为每个任务分配唯一的 ID。这使得跟踪任务和结果更容易。 * **结果转发:** 主服务器现在将从处理器收到的结果直接转发到客户端。这允许客户端在处理结果时接收结果,而不必在最后请求所有结果。 * **异步结果接收:** 客户端现在使用单独的线程 (`start_receiving`) 来异步接收来自主服务器的结果。这可以防止客户端在等待结果时阻塞。 * **任务确认:** 主服务器在收到任务时向客户端发送 `TASK_ACCEPTED` 消息。这向客户端提供反馈,表明任务已成功提交。 * **错误处理:** 添加了基本的 `try...except` 块来捕获套接字操作和线程执行期间的潜在异常。这仍然非常基础,但这是一个开始。 * **注释:** 添加了更多注释来解释代码。 * **`eval()` 警告:** 包含有关使用 `eval()` 解析结果字符串的警告。如果您从不受信任的来源接收数据,`eval()` 是危险的。如果可能,请使用更安全的序列化方法,例如 `json.loads()`。 * **模拟处理时间:** 处理器现在使用 `time.sleep(random.uniform(0.5, 2))` 模拟可变处理时间。这使得示例更逼真。 * **客户端断开连接处理:** 主服务器现在通过在客户端断开连接时设置 `self.client_socket = None` 来更优雅地处理客户端断开连接。这可以防止主服务器尝试将结果发送到已断开连接的客户端时出错。 **下一步:** 1. **替换模拟处理:** 将 `Processor.process_tasks()` 方法中的 `time.sleep()` 替换为您的实际任务处理逻辑。 2. **实现序列化:** 使用 `json`、`pickle` 或 `protobuf` 在客户端、主服务器和处理器之间序列化和反序列化数据。 3. **添加错误处理:** 添加更健壮的错误处理来处理连接错误、数据损坏和其他潜在问题。 4. **实现安全性:** 添加身份验证和加密以保护您的系统免受未经授权的访问。 5. **考虑异步 I/O:** 使用 `asyncio` 进行异步 I/O 以提高性能,尤其是在您有大量客户端或处理器的情况下。 6. **使用消息队列:** 对于更具可扩展性的架构,请使用消息队列(如 RabbitMQ 或 Redis Pub/Sub)来解耦客户端、主服务器和处理器。 这个改进的示例为构建您的 MCP 系统提供了一个更完整和功能更强大的起点。请记住在生产环境中部署此系统之前解决安全性和可扩展性问题。
mcp-init
Okay, here's a basic outline and code snippets for creating a new MCP (Minecraft Protocol) server in TypeScript, with some "batteries included" features like basic logging, player management, and simple command handling. This is a starting point; a full MCP server is a complex project. **Conceptual Outline** 1. **Project Setup:** Initialize a TypeScript project with necessary dependencies. 2. **Networking:** Set up a TCP server to listen for incoming Minecraft client connections. 3. **Protocol Handling:** Implement the Minecraft protocol (handshaking, status, login, play). This is the most complex part. We'll use a library to help. 4. **Player Management:** Track connected players, their usernames, UUIDs, and game state. 5. **Command Handling:** Parse and execute simple commands entered by players. 6. **World Simulation (Optional):** A very basic world representation (e.g., a flat plane) to allow players to move around. 7. **Logging:** Implement a basic logging system for debugging and monitoring. **Code Snippets (TypeScript)** **1. Project Setup** ```bash mkdir mcp-server cd mcp-server npm init -y npm install typescript ts-node ws uuid node-nbt --save # Core dependencies npm install @types/node @types/ws @types/uuid --save-dev # Type definitions tsc --init # Initialize TypeScript config ``` **`tsconfig.json` (Example)** ```json { "compilerOptions": { "target": "es2020", "module": "commonjs", "outDir": "./dist", "rootDir": "./src", "strict": true, "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, "resolveJsonModule": true }, "include": ["src/**/*"], "exclude": ["node_modules"] } ``` **2. `src/index.ts` (Main Server File)** ```typescript import { WebSocketServer, WebSocket } from 'ws'; import { v4 as uuidv4 } from 'uuid'; import * as nbt from 'node-nbt'; // Basic Logging const log = (message: string) => { console.log(`[${new Date().toISOString()}] ${message}`); }; // Player Interface interface Player { id: string; username: string; socket: WebSocket; x: number; y: number; z: number; } // Server Configuration const SERVER_PORT = 25565; const SERVER_MOTD = "My Awesome MCP Server"; const MAX_PLAYERS = 10; // Global Server State const players: { [id: string]: Player } = {}; // Function to handle incoming messages from clients const handleClientMessage = (ws: WebSocket, message: string) => { try { const data = JSON.parse(message); // Assuming JSON format for simplicity if (data.type === 'chat') { const playerId = Object.keys(players).find(key => players[key].socket === ws); if (playerId) { const player = players[playerId]; const chatMessage = `<${player.username}> ${data.message}`; log(chatMessage); broadcast(chatMessage); // Broadcast to all players } } else if (data.type === 'position') { const playerId = Object.keys(players).find(key => players[key].socket === ws); if (playerId) { const player = players[playerId]; player.x = data.x; player.y = data.y; player.z = data.z; // Broadcast position update (optional) // broadcastPosition(player); } } else { log(`Unknown message type: ${data.type}`); } } catch (error) { log(`Error parsing message: ${error}`); } }; // Function to broadcast a message to all connected clients const broadcast = (message: string) => { for (const playerId in players) { if (players.hasOwnProperty(playerId)) { const player = players[playerId]; player.socket.send(JSON.stringify({ type: 'chat', message: message })); } } }; // Function to handle new client connections const handleNewConnection = (ws: WebSocket) => { const playerId = uuidv4(); let username = `Player${Object.keys(players).length + 1}`; // Default username log(`New connection from ${ws._socket.remoteAddress}, assigning ID: ${playerId}`); // Add the player to the players object players[playerId] = { id: playerId, username: username, socket: ws, x: 0, y: 0, z: 0, }; // Send a welcome message to the new player ws.send(JSON.stringify({ type: 'welcome', message: `Welcome to the server, ${username}!` })); // Notify other players about the new player (optional) broadcast(`${username} has joined the server.`); // Set up event listeners for the new connection ws.on('message', (message) => { handleClientMessage(ws, message.toString()); }); ws.on('close', () => { log(`Connection closed for player ${playerId}`); delete players[playerId]; broadcast(`${username} has left the server.`); }); ws.on('error', (error) => { log(`Error on connection ${playerId}: ${error}`); delete players[playerId]; }); }; // Create a WebSocket server const wss = new WebSocketServer({ port: SERVER_PORT }); wss.on('connection', handleNewConnection); wss.on('listening', () => { log(`Server listening on port ${SERVER_PORT}`); }); wss.on('error', (error) => { log(`Server error: ${error}`); }); // Start the server log(`Starting MCP server...`); ``` **3. Building and Running** ```bash npm run build # Transpile TypeScript to JavaScript (check your package.json for the build script) node dist/index.js # Run the server ``` **Explanation and "Batteries Included" Features:** * **`ws` Library:** Uses the `ws` library for WebSocket communication, which is a common choice for real-time applications. This handles the low-level socket management. * **`uuid` Library:** Generates unique player IDs using the `uuid` library. * **`node-nbt` Library:** This is included for handling NBT (Named Binary Tag) data, which is used for storing world data, player data, and other Minecraft-related information. You'll need this for more advanced features. * **Logging:** A simple `log` function provides basic timestamped logging to the console. * **Player Management:** The `players` object stores information about connected players (ID, username, socket, position). * **Chat:** Basic chat functionality is implemented. Players can send messages, and the server broadcasts them to all other players. * **Position Updates:** The server can receive position updates from clients, although the example doesn't do anything with them beyond storing them. * **Error Handling:** Basic error handling is included for connection errors and message parsing errors. **Important Considerations and Next Steps:** * **Minecraft Protocol:** This example *does not* implement the full Minecraft protocol. You'll need to handle the handshake, status, login, and play states correctly. Libraries like `prismarine-protocol` (Node.js) can help with this, but they are complex. The example uses a simplified JSON-based communication for demonstration. * **Security:** This is a very basic example and has no security measures. You'll need to implement proper authentication, authorization, and input validation to prevent malicious clients from exploiting the server. * **World Generation:** The example doesn't have any world generation. You'll need to implement a world generator to create a playable environment. Consider using libraries for procedural generation. * **Game Logic:** You'll need to implement the game logic for your server, such as handling player movement, interactions with the world, and other game events. * **Performance:** For a large number of players, you'll need to optimize the server's performance. Consider using techniques like multithreading or clustering. * **Data Storage:** You'll need to store world data, player data, and other server data in a persistent storage system, such as a database or file system. * **Command System:** Expand the command handling to support more complex commands and permissions. **Example Client (Simple WebSocket Client)** You'll need a client to connect to your server. Here's a very basic HTML/JavaScript example: ```html <!DOCTYPE html> <html> <head> <title>MCP Client</title> </head> <body> <h1>MCP Client</h1> <input type="text" id="messageInput" placeholder="Enter message"> <button onclick="sendMessage()">Send</button> <div id="chatLog"></div> <script> const ws = new WebSocket('ws://localhost:25565'); // Replace with your server address ws.onopen = () => { console.log('Connected to server'); }; ws.onmessage = (event) => { const data = JSON.parse(event.data); const chatLog = document.getElementById('chatLog'); if (data.type === 'chat') { chatLog.innerHTML += `<p>${data.message}</p>`; } else if (data.type === 'welcome') { chatLog.innerHTML += `<p>${data.message}</p>`; } else { console.log('Received:', data); } }; ws.onclose = () => { console.log('Disconnected from server'); }; ws.onerror = (error) => { console.error('WebSocket error:', error); }; function sendMessage() { const messageInput = document.getElementById('messageInput'); const message = messageInput.value; ws.send(JSON.stringify({ type: 'chat', message: message })); messageInput.value = ''; } </script> </body> </html> ``` Save this as `index.html` and open it in your browser. You should be able to connect to the server and send chat messages. **In summary, this is a very basic starting point. Building a full MCP server is a significant undertaking that requires a deep understanding of the Minecraft protocol and server-side development concepts.** Use this as a foundation and build upon it, researching the Minecraft protocol and using appropriate libraries to handle the complexities. Good luck!
⚡️ mcpo
一个简单且安全的 MCP 到 OpenAPI 的代理服务器
GitHub MCP Server
GitHub MCP 服务器 (GitHub MCP fúwùqì)
MCP Server Example
feishu-mcp-server
npm feishu-mcp-server (This remains the same as it's a command/package name)

Prisma
Multi MCP Server
多重 MCP 服务器 (Duōchóng MCP fúwùqì)
Test MCP
用于测试目的的简单 MCP 服务器。
MCP Dev Server UI
local-llm-obsidian-knowledge-base
一个模板仓库,包含一个用于运行本地 LLM 和内置知识库的开发容器。使用 `git subtree` 或 `git submodule` 添加一个 Git 仓库,并使用 MCP 客户端/服务器关系(例如,像 `Cline` 这样的 `VS Code` 扩展和 `Filesystem`/`Obsidian-MCP` MCP 服务器)来更新它。
Verodat MCP Layer Architecture Diagram
Verodat MCP 服务器实现 (Verodat MCP fúwùqì shíxiàn)
Model Context Protocol servers
模型上下文协议服务器 - 模型上下文协议 (MCP) 的一系列参考实现。
GitLab MCP Server Test Repository
MCP GitLab 服务器验证的测试仓库
mcpmonorepo
在单体仓库中用于 MCP 服务器的测试平台