发现优秀的 MCP 服务器

通过 MCP 服务器扩展您的代理能力,拥有 27,779 个能力。

全部27,779
MCP Dev Server UI

MCP Dev Server UI

Add API key to .env file

Add API key to .env file

Okay, here's a basic implementation of a simplified Master-Client-Processor (MCP) system in Python, focusing on clarity and simplicity. This example uses TCP sockets for communication and demonstrates a basic task distribution and result collection. **Important Considerations:** * **Error Handling:** This is a *very* basic example. Robust error handling (try-except blocks, connection timeouts, etc.) is crucial for a real-world system. * **Security:** This code does *not* include any security measures (authentication, encryption). Do *not* use this in a production environment without adding appropriate security. * **Serialization:** This example uses simple string-based communication. For more complex data, you'll need to use a serialization library like `pickle`, `json`, or `protobuf`. `pickle` is easy but has security risks if you're receiving data from untrusted sources. * **Threading/Asynchronous:** This example uses basic threading. For higher performance, consider using `asyncio` for asynchronous I/O. * **Scalability:** This is a simple example and won't scale well to a large number of clients or processors. Consider using message queues (e.g., RabbitMQ, Redis Pub/Sub) for a more scalable architecture. **Code:** ```python import socket import threading import time import random # --- Master (Server) --- class Master: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) # Listen for up to 5 connections self.processors = [] # List to store processor connections self.client_socket = None # Store the client connection self.task_queue = [] # Queue to store tasks self.results = {} # Dictionary to store results def start(self): print(f"Master listening on {self.host}:{self.port}") threading.Thread(target=self.accept_processors).start() threading.Thread(target=self.accept_client).start() def accept_processors(self): while True: try: processor_socket, processor_address = self.server_socket.accept() print(f"Processor connected from {processor_address}") self.processors.append(processor_socket) threading.Thread(target=self.handle_processor, args=(processor_socket,)).start() except Exception as e: print(f"Error accepting processor connection: {e}") break def accept_client(self): try: self.client_socket, client_address = self.server_socket.accept() print(f"Client connected from {client_address}") threading.Thread(target=self.handle_client, args=(self.client_socket,)).start() except Exception as e: print(f"Error accepting client connection: {e}") def handle_processor(self, processor_socket): try: while True: if self.task_queue: task_id, task_data = self.task_queue.pop(0) processor_socket.send(f"TASK:{task_id}:{task_data}".encode()) print(f"Sent task {task_id} to processor") result = processor_socket.recv(1024).decode() if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"Received result {result_id}: {result_data}") if self.client_socket: self.client_socket.send(f"RESULT:{result_id}:{result_data}".encode()) # Forward result to client else: print(f"Unexpected message from processor: {result}") else: time.sleep(1) # Check for tasks periodically except Exception as e: print(f"Processor disconnected or error: {e}") self.processors.remove(processor_socket) finally: processor_socket.close() def handle_client(self, client_socket): try: while True: message = client_socket.recv(1024).decode() if not message: break if message.startswith("TASK:"): _, task_data = message.split(":", 1) task_id = len(self.results) # Assign a unique ID self.task_queue.append((task_id, task_data)) print(f"Received task {task_id} from client: {task_data}") client_socket.send(f"TASK_ACCEPTED:{task_id}".encode()) # Acknowledge task elif message == "GET_RESULTS": client_socket.send(str(self.results).encode()) else: print(f"Unknown message from client: {message}") except Exception as e: print(f"Client disconnected or error: {e}") finally: client_socket.close() self.client_socket = None # Reset client socket # --- Processor (Worker) --- class Processor: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"Processor connected to {self.host}:{self.port}") self.process_tasks() except Exception as e: print(f"Error connecting to master: {e}") def process_tasks(self): try: while True: data = self.client_socket.recv(1024).decode() if not data: break if data.startswith("TASK:"): _, task_id, task_data = data.split(":", 2) print(f"Received task {task_id}: {task_data}") # Simulate processing (replace with actual work) time.sleep(random.uniform(0.5, 2)) # Simulate variable processing time result = f"Processed: {task_data.upper()}" self.client_socket.send(f"RESULT:{task_id}:{result}".encode()) print(f"Sent result {task_id}: {result}") else: print(f"Unknown message: {data}") except Exception as e: print(f"Connection lost or error: {e}") finally: self.client_socket.close() # --- Client --- class Client: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.results = {} def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"Connected to {self.host}:{self.port}") except Exception as e: print(f"Error connecting to master: {e}") return False return True def send_task(self, task_data): try: self.client_socket.send(f"TASK:{task_data}".encode()) response = self.client_socket.recv(1024).decode() if response.startswith("TASK_ACCEPTED:"): _, task_id = response.split(":") print(f"Task {task_id} accepted by master.") return int(task_id) else: print(f"Unexpected response: {response}") return None except Exception as e: print(f"Error sending task: {e}") return None def get_results(self): try: self.client_socket.send("GET_RESULTS".encode()) results_str = self.client_socket.recv(4096).decode() try: self.results = eval(results_str) # WARNING: eval() is dangerous with untrusted input print("Received results:", self.results) except: print("Could not decode results") except Exception as e: print(f"Error getting results: {e}") def receive_results(self): try: while True: result = self.client_socket.recv(1024).decode() if not result: break if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"Received result {result_id}: {result_data}") else: print(f"Unexpected message: {result}") except Exception as e: print(f"Error receiving results: {e}") finally: self.client_socket.close() def start_receiving(self): threading.Thread(target=self.receive_results).start() # --- Main --- if __name__ == "__main__": # Start the master master = Master() master.start() # Start some processors (adjust the number as needed) num_processors = 3 processors = [] for i in range(num_processors): processor = Processor() processors.append(threading.Thread(target=processor.connect)) processors[-1].start() time.sleep(0.1) # Small delay to avoid connection issues # Start the client client = Client() if client.connect(): client.start_receiving() # Start a thread to receive results asynchronously # Send some tasks task_ids = [] for i in range(5): task_id = client.send_task(f"Task {i}") if task_id is not None: task_ids.append(task_id) time.sleep(0.5) # Wait for a while to allow processing time.sleep(5) # Get the results (optional, if not using asynchronous result receiving) # client.get_results() # Keep the main thread alive for a while to allow everything to finish time.sleep(10) ``` **How to Run:** 1. **Save:** Save the code as a Python file (e.g., `mcp_system.py`). 2. **Run the Master:** `python mcp_system.py` (This will start the master server.) 3. **Run the Processors:** Open *multiple* terminal windows and run `python mcp_system.py` in each. Each instance will act as a processor and connect to the master. Adjust `num_processors` in the `if __name__ == "__main__":` block to control how many processors are started. 4. **Run the Client:** Open another terminal window and run `python mcp_system.py`. This will start the client, which will connect to the master and send tasks. **Explanation:** * **Master:** * Listens for connections from processors and a client. * Maintains a task queue. * Distributes tasks to available processors. * Collects results from processors. * Forwards results to the client. * **Processor:** * Connects to the master. * Receives tasks from the master. * Simulates processing the task (replace with your actual processing logic). * Sends the result back to the master. * **Client:** * Connects to the master. * Sends tasks to the master. * Receives results from the master. * Can request all results at once or receive them asynchronously as they become available. **Key Improvements and Explanations:** * **Clearer Task Handling:** The master now uses a `task_queue` to store tasks and assigns unique IDs to each task. This makes it easier to track tasks and results. * **Result Forwarding:** The master now forwards results received from processors directly to the client. This allows the client to receive results as they are processed, rather than having to request them all at the end. * **Asynchronous Result Receiving:** The client now uses a separate thread (`start_receiving`) to receive results from the master asynchronously. This prevents the client from blocking while waiting for results. * **Task Acknowledgement:** The master sends a `TASK_ACCEPTED` message to the client when a task is received. This provides feedback to the client that the task has been successfully submitted. * **Error Handling:** Basic `try...except` blocks are added to catch potential exceptions during socket operations and thread execution. This is still very basic, but it's a start. * **Comments:** More comments are added to explain the code. * **`eval()` Warning:** A warning is included about the use of `eval()` to parse the results string. `eval()` is dangerous if you're receiving data from untrusted sources. Use a safer serialization method like `json.loads()` if possible. * **Simulated Processing Time:** The processor now simulates variable processing time using `time.sleep(random.uniform(0.5, 2))`. This makes the example more realistic. * **Client Disconnect Handling:** The master now handles client disconnections more gracefully by setting `self.client_socket = None` when the client disconnects. This prevents errors if the master tries to send results to a disconnected client. **Next Steps:** 1. **Replace Simulated Processing:** Replace the `time.sleep()` in the `Processor.process_tasks()` method with your actual task processing logic. 2. **Implement Serialization:** Use `json`, `pickle`, or `protobuf` to serialize and deserialize data between the client, master, and processors. 3. **Add Error Handling:** Add more robust error handling to handle connection errors, data corruption, and other potential issues. 4. **Implement Security:** Add authentication and encryption to protect your system from unauthorized access. 5. **Consider Asynchronous I/O:** Use `asyncio` for asynchronous I/O to improve performance, especially if you have a large number of clients or processors. 6. **Use a Message Queue:** For a more scalable architecture, use a message queue like RabbitMQ or Redis Pub/Sub to decouple the client, master, and processors. This improved example provides a more complete and functional starting point for building your MCP system. Remember to address the security and scalability concerns before deploying this in a production environment. ```chinese 好的,这是一个用 Python 实现的简化版主-客户端-处理器 (MCP) 系统的基本示例,重点在于清晰和简洁。此示例使用 TCP 套接字进行通信,并演示了基本的任务分配和结果收集。 **重要注意事项:** * **错误处理:** 这是一个 *非常* 基础的示例。对于实际系统,健壮的错误处理(try-except 块、连接超时等)至关重要。 * **安全性:** 此代码 *不* 包含任何安全措施(身份验证、加密)。在添加适当的安全性之前,*不要* 在生产环境中使用它。 * **序列化:** 此示例使用简单的基于字符串的通信。对于更复杂的数据,您需要使用序列化库,例如 `pickle`、`json` 或 `protobuf`。 `pickle` 很容易使用,但如果您从不受信任的来源接收数据,则存在安全风险。 * **线程/异步:** 此示例使用基本的线程。为了获得更高的性能,请考虑使用 `asyncio` 进行异步 I/O。 * **可扩展性:** 这是一个简单的示例,无法很好地扩展到大量的客户端或处理器。考虑使用消息队列(例如,RabbitMQ、Redis Pub/Sub)来实现更具可扩展性的架构。 **代码:** ```python import socket import threading import time import random # --- 主服务器 (Master) --- class Master: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) # 监听最多 5 个连接 self.processors = [] # 存储处理器连接的列表 self.client_socket = None # 存储客户端连接 self.task_queue = [] # 存储任务的队列 self.results = {} # 存储结果的字典 def start(self): print(f"主服务器监听 {self.host}:{self.port}") threading.Thread(target=self.accept_processors).start() threading.Thread(target=self.accept_client).start() def accept_processors(self): while True: try: processor_socket, processor_address = self.server_socket.accept() print(f"处理器从 {processor_address} 连接") self.processors.append(processor_socket) threading.Thread(target=self.handle_processor, args=(processor_socket,)).start() except Exception as e: print(f"接受处理器连接时出错:{e}") break def accept_client(self): try: self.client_socket, client_address = self.server_socket.accept() print(f"客户端从 {client_address} 连接") threading.Thread(target=self.handle_client, args=(self.client_socket,)).start() except Exception as e: print(f"接受客户端连接时出错:{e}") def handle_processor(self, processor_socket): try: while True: if self.task_queue: task_id, task_data = self.task_queue.pop(0) processor_socket.send(f"TASK:{task_id}:{task_data}".encode()) print(f"已将任务 {task_id} 发送到处理器") result = processor_socket.recv(1024).decode() if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"收到结果 {result_id}: {result_data}") if self.client_socket: self.client_socket.send(f"RESULT:{result_id}:{result_data}".encode()) # 将结果转发到客户端 else: print(f"来自处理器的意外消息:{result}") else: time.sleep(1) # 定期检查任务 except Exception as e: print(f"处理器断开连接或出错:{e}") self.processors.remove(processor_socket) finally: processor_socket.close() def handle_client(self, client_socket): try: while True: message = client_socket.recv(1024).decode() if not message: break if message.startswith("TASK:"): _, task_data = message.split(":", 1) task_id = len(self.results) # 分配一个唯一的 ID self.task_queue.append((task_id, task_data)) print(f"收到来自客户端的任务 {task_id}: {task_data}") client_socket.send(f"TASK_ACCEPTED:{task_id}".encode()) # 确认任务 elif message == "GET_RESULTS": client_socket.send(str(self.results).encode()) else: print(f"来自客户端的未知消息:{message}") except Exception as e: print(f"客户端断开连接或出错:{e}") finally: client_socket.close() self.client_socket = None # 重置客户端套接字 # --- 处理器 (Worker) --- class Processor: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"处理器连接到 {self.host}:{self.port}") self.process_tasks() except Exception as e: print(f"连接到主服务器时出错:{e}") def process_tasks(self): try: while True: data = self.client_socket.recv(1024).decode() if not data: break if data.startswith("TASK:"): _, task_id, task_data = data.split(":", 2) print(f"收到任务 {task_id}: {task_data}") # 模拟处理(替换为实际工作) time.sleep(random.uniform(0.5, 2)) # 模拟可变处理时间 result = f"已处理: {task_data.upper()}" self.client_socket.send(f"RESULT:{task_id}:{result}".encode()) print(f"已发送结果 {task_id}: {result}") else: print(f"未知消息:{data}") except Exception as e: print(f"连接丢失或出错:{e}") finally: self.client_socket.close() # --- 客户端 --- class Client: def __init__(self, host='localhost', port=12345): self.host = host self.port = port self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.results = {} def connect(self): try: self.client_socket.connect((self.host, self.port)) print(f"连接到 {self.host}:{self.port}") except Exception as e: print(f"连接到主服务器时出错:{e}") return False return True def send_task(self, task_data): try: self.client_socket.send(f"TASK:{task_data}".encode()) response = self.client_socket.recv(1024).decode() if response.startswith("TASK_ACCEPTED:"): _, task_id = response.split(":") print(f"任务 {task_id} 已被主服务器接受。") return int(task_id) else: print(f"意外响应:{response}") return None except Exception as e: print(f"发送任务时出错:{e}") return None def get_results(self): try: self.client_socket.send("GET_RESULTS".encode()) results_str = self.client_socket.recv(4096).decode() try: self.results = eval(results_str) # 警告:eval() 对于不受信任的输入是危险的 print("收到结果:", self.results) except: print("无法解码结果") except Exception as e: print(f"获取结果时出错:{e}") def receive_results(self): try: while True: result = self.client_socket.recv(1024).decode() if not result: break if result.startswith("RESULT:"): _, result_id, result_data = result.split(":", 2) self.results[int(result_id)] = result_data print(f"收到结果 {result_id}: {result_data}") else: print(f"意外消息:{result}") except Exception as e: print(f"接收结果时出错:{e}") finally: self.client_socket.close() def start_receiving(self): threading.Thread(target=self.receive_results).start() # --- 主函数 --- if __name__ == "__main__": # 启动主服务器 master = Master() master.start() # 启动一些处理器(根据需要调整数量) num_processors = 3 processors = [] for i in range(num_processors): processor = Processor() processors.append(threading.Thread(target=processor.connect)) processors[-1].start() time.sleep(0.1) # 稍微延迟以避免连接问题 # 启动客户端 client = Client() if client.connect(): client.start_receiving() # 启动一个线程以异步接收结果 # 发送一些任务 task_ids = [] for i in range(5): task_id = client.send_task(f"任务 {i}") if task_id is not None: task_ids.append(task_id) time.sleep(0.5) # 等待一段时间以允许处理 time.sleep(5) # 获取结果(可选,如果未使用异步结果接收) # client.get_results() # 保持主线程活动一段时间以允许一切完成 time.sleep(10) ``` **如何运行:** 1. **保存:** 将代码保存为 Python 文件(例如,`mcp_system.py`)。 2. **运行主服务器:** `python mcp_system.py`(这将启动主服务器。) 3. **运行处理器:** 打开 *多个* 终端窗口并在每个窗口中运行 `python mcp_system.py`。每个实例将充当处理器并连接到主服务器。调整 `if __name__ == "__main__":` 块中的 `num_processors` 以控制启动的处理器数量。 4. **运行客户端:** 打开另一个终端窗口并运行 `python mcp_system.py`。这将启动客户端,客户端将连接到主服务器并发送任务。 **说明:** * **主服务器:** * 监听来自处理器和客户端的连接。 * 维护任务队列。 * 将任务分配给可用的处理器。 * 从处理器收集结果。 * 将结果转发到客户端。 * **处理器:** * 连接到主服务器。 * 从主服务器接收任务。 * 模拟处理任务(替换为您的实际处理逻辑)。 * 将结果发送回主服务器。 * **客户端:** * 连接到主服务器。 * 将任务发送到主服务器。 * 从主服务器接收结果。 * 可以一次请求所有结果,也可以在结果可用时异步接收它们。 **主要改进和说明:** * **更清晰的任务处理:** 主服务器现在使用 `task_queue` 来存储任务,并为每个任务分配唯一的 ID。这使得跟踪任务和结果更容易。 * **结果转发:** 主服务器现在将从处理器收到的结果直接转发到客户端。这允许客户端在处理结果时接收结果,而不必在最后请求所有结果。 * **异步结果接收:** 客户端现在使用单独的线程 (`start_receiving`) 来异步接收来自主服务器的结果。这可以防止客户端在等待结果时阻塞。 * **任务确认:** 主服务器在收到任务时向客户端发送 `TASK_ACCEPTED` 消息。这向客户端提供反馈,表明任务已成功提交。 * **错误处理:** 添加了基本的 `try...except` 块来捕获套接字操作和线程执行期间的潜在异常。这仍然非常基础,但这是一个开始。 * **注释:** 添加了更多注释来解释代码。 * **`eval()` 警告:** 包含有关使用 `eval()` 解析结果字符串的警告。如果您从不受信任的来源接收数据,`eval()` 是危险的。如果可能,请使用更安全的序列化方法,例如 `json.loads()`。 * **模拟处理时间:** 处理器现在使用 `time.sleep(random.uniform(0.5, 2))` 模拟可变处理时间。这使得示例更逼真。 * **客户端断开连接处理:** 主服务器现在通过在客户端断开连接时设置 `self.client_socket = None` 来更优雅地处理客户端断开连接。这可以防止主服务器尝试将结果发送到已断开连接的客户端时出错。 **下一步:** 1. **替换模拟处理:** 将 `Processor.process_tasks()` 方法中的 `time.sleep()` 替换为您的实际任务处理逻辑。 2. **实现序列化:** 使用 `json`、`pickle` 或 `protobuf` 在客户端、主服务器和处理器之间序列化和反序列化数据。 3. **添加错误处理:** 添加更健壮的错误处理来处理连接错误、数据损坏和其他潜在问题。 4. **实现安全性:** 添加身份验证和加密以保护您的系统免受未经授权的访问。 5. **考虑异步 I/O:** 使用 `asyncio` 进行异步 I/O 以提高性能,尤其是在您有大量客户端或处理器的情况下。 6. **使用消息队列:** 对于更具可扩展性的架构,请使用消息队列(如 RabbitMQ 或 Redis Pub/Sub)来解耦客户端、主服务器和处理器。 这个改进的示例为构建您的 MCP 系统提供了一个更完整和功能更强大的起点。请记住在生产环境中部署此系统之前解决安全性和可扩展性问题。

Verodat MCP Layer Architecture Diagram

Verodat MCP Layer Architecture Diagram

Verodat MCP 服务器实现 (Verodat MCP fúwùqì shíxiàn)

Pearch

Pearch

Best people search engine that reduces the time spent on talent discovery

YouTube Transcript Fetcher (YTT)

YouTube Transcript Fetcher (YTT)

An MCP server that enables searching YouTube and retrieving high-accuracy video transcripts using local Whisper AI transcription without requiring an API key. It supports single or batch processing and provides transcripts in multiple formats including text, JSON, and SRT.

Starwind UI MCP Server

Starwind UI MCP Server

一个 TypeScript 服务器,旨在增强 AI 助手在使用 Starwind UI 组件时的能力,提供项目初始化、组件安装、文档访问等工具。

Hello MCP Server

Hello MCP Server

A simple demonstration MCP server that responds with "4" when greeted with "hello". Serves as a basic example for understanding MCP server implementation.

feishu-mcp-server

feishu-mcp-server

npm feishu-mcp-server (This remains the same as it's a command/package name)

Datadog MCP Server

Datadog MCP Server

镜子 (jìng zi)

mintlify-mcp

mintlify-mcp

An MCP server that enables users to query any Mintlify-powered documentation site directly from Claude. It leverages Mintlify's AI Assistant API to provide RAG-based answers and code examples for various platforms like Agno, Resend, and Upstash.

Multi MCP Server

Multi MCP Server

多重 MCP 服务器 (Duōchóng MCP fúwùqì)

sushimcp

sushimcp

sushimcp

Useful MCP Servers Collection

Useful MCP Servers Collection

用于各种集成和任务的有用且安全的 MCP 服务器集合

xtai-mcp-data-analysis

xtai-mcp-data-analysis

An MCP server for data analysis and visualization supporting CSV and Excel files. It enables users to generate statistical summaries and create multi-dimensional charts like heatmaps and bar plots through natural language.

Continuum

Continuum

Automatically extracts architectural decisions, patterns, and insights from Git commits to build a local, structured project memory. It exposes this living context to AI tools via MCP, allowing them to understand the historical reasoning and evolution behind your codebase.

Model Context Protocol servers

Model Context Protocol servers

模型上下文协议服务器 - 模型上下文协议 (MCP) 的一系列参考实现。

Get My Notion MCP Server

Get My Notion MCP Server

Provides access to a specific GitHub repository (my-notion) allowing AI assistants to browse files, read content, and track commit information. Enables real-time interaction with repository data through the GitHub API without authentication requirements.

Harmonic MCP Server

Harmonic MCP Server

Provides tools to interact with the Harmonic AI API for company and person enrichment, specifically focusing on venture capital deal flow. It enables users to search for companies using natural language, find similar businesses, and retrieve detailed information on organizations and individuals.

BuildBetter

BuildBetter

AI for product teams. Connect all your company and customer knowledge to all of your other tools via BuildBetter's MCP.

Schwab MCP Server

Schwab MCP Server

A read-only MCP server that provides access to Charles Schwab account data and market information, including portfolio positions, real-time quotes, options chains, price history, and account balances through AI assistants.

Zerodha Trading Bot MCP

Zerodha Trading Bot MCP

An automated trading bot that interfaces with Zerodha to execute stock trades, manage positions, and access market information through natural language commands.

OpenProject MCP Server

OpenProject MCP Server

Enables AI agents to interact with OpenProject's APIv3 for autonomous project management, including task tracking, member administration, and project configuration. It supports comprehensive operations for managing work packages, projects, and reference data like statuses and priorities.

PubMed MCP Server

PubMed MCP Server

A comprehensive Model Context Protocol server that enables advanced PubMed literature search, citation formatting, and research analysis through natural language interactions.

Eureka Labo MCP Server

Eureka Labo MCP Server

Enables task management and automated development tracking for Eureka Labo projects with git integration. Supports creating, updating, and tracking tasks while automatically capturing code changes and generating detailed development logs with syntax highlighting.

QuantPlay MCP Server

QuantPlay MCP Server

Provides access to the QuantPlay trading API for managing broker accounts, tracking positions, and analyzing holdings. It enables users to interact with their trading data through MCP-compatible clients like Claude Desktop.

GitLab MCP Server Test Repository

GitLab MCP Server Test Repository

MCP GitLab 服务器验证的测试仓库

Snowflake Cortex AI MCP Server

Snowflake Cortex AI MCP Server

Integrates Snowflake Cortex AI features like Cortex Search and Analyst into the MCP ecosystem, enabling retrieval-augmented generation and structured data analysis. It also provides tools for chat completions and agentic orchestration using Snowflake-hosted large language models.

Oracle DB Context

Oracle DB Context

Provides contextual Oracle database schema information to AI assistants, enabling them to understand and work with large databases containing thousands of tables. Supports multi-database connections, smart schema caching, table lookups, and relationship mapping.

Test MCP

Test MCP

用于测试目的的简单 MCP 服务器。

pdffigures2-MCP-Server

pdffigures2-MCP-Server

一个用于 pdffigures2 的 MCP 服务器:该服务器处理学术 PDF,以高精度提取图表、表格、标题和章节标题。它旨在支持研究人员和开发人员高效地分析学术文档。