Scaled MCP Server

Scaled MCP Server

ScaledMCP 是一个可以水平扩展的 MCP 和 A2A 服务器。你知道的,为了 AI (人工智能)。

Traego

研究与数据
访问服务器

README

Scaled MCP 服务器

CI 状态 Go 参考 Go Report Card codecov License

一个水平可扩展的 MCP (消息上下文协议) 服务器实现,支持负载均衡部署。

概述

Scaled MCP Server 是一个 Go 库,它实现了 MCP 2025-03 规范,并支持水平扩展。 它被设计为嵌入到您的应用程序中,并提供灵活的配置选项。

特性

  • HTTP 传输: 灵活的 HTTP 传输,具有主 /mcp 端点、可选的 SSE 端点和功能协商
  • 会话管理: 分布式会话管理,支持 Redis 或内存选项
  • Actor 系统: 使用基于 Actor 的架构来处理会话和消息路由
  • 水平扩展: 支持跨多个节点的负载均衡部署

安装

go get github.com/traego/scaled-mcp@latest

注意: 此库需要 Go 1.24 或更高版本。

用法

带有静态工具的基本服务器

package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"os/signal"
	"syscall"

	"github.com/traego/scaled-mcp/pkg/config"
	"github.com/traego/scaled-mcp/pkg/resources"
	"github.com/traego/scaled-mcp/pkg/server"
)

func main() {
	// 配置日志
	logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})
	slog.SetDefault(slog.New(logHandler))

	// 使用默认配置创建服务器
	cfg := config.DefaultConfig()
	
	// 为了简单起见,使用内存会话存储
	cfg.Session.UseInMemory = true
	
	// 创建一个静态工具注册表
	registry := resources.NewStaticToolRegistry()
	
	// 定义并注册一个简单的计算器工具
	calculatorTool := resources.NewTool("calculator").
		WithDescription("执行基本算术运算").
		WithInputs([]resources.ToolInput{
			{
				Name:        "operation",
				Type:        "string",
				Description: "要执行的操作 (add, subtract, multiply, divide)",
				Required:    true,
			},
			{
				Name:        "a",
				Type:        "number",
				Description: "第一个操作数",
				Required:    true,
			},
			{
				Name:        "b",
				Type:        "number",
				Description: "第二个操作数",
				Required:    true,
			},
		}).
		Build()
	
	// 将工具注册到注册表中
	registry.RegisterTool(calculatorTool)
	
	// 定义服务器的提示
	prompt := "你是一个有用的 AI 助手,可以使用计算器工具执行计算。"
	
	// 使用工具注册表和提示创建服务器
	srv, err := server.NewMcpServer(cfg,
		server.WithToolRegistry(registry),
		server.WithServerInfo("Example MCP Server", "1.0.0"),
		server.WithPrompt(prompt),
	)
	if err != nil {
		slog.Error("创建服务器失败", "error", err)
		os.Exit(1)
	}
	
	// 设置工具处理程序
	registry.SetToolHandler("calculator", func(ctx context.Context, params map[string]interface{}) (interface{}, error) {
		// 提取参数
		operation, ok := params["operation"].(string)
		if !ok {
			return nil, fmt.Errorf("%w: operation 必须是字符串", resources.ErrInvalidParams)
		}
		
		a, ok := params["a"].(float64)
		if !ok {
			return nil, fmt.Errorf("%w: a 必须是数字", resources.ErrInvalidParams)
		}
		
		b, ok := params["b"].(float64)
		if !ok {
			return nil, fmt.Errorf("%w: b 必须是数字", resources.ErrInvalidParams)
		}
		
		// 执行计算
		var result float64
		switch operation {
		case "add":
			result = a + b
		case "subtract":
			result = a - b
		case "multiply":
			result = a * b
		case "divide":
			if b == 0 {
				return nil, fmt.Errorf("%w: 除数为零", resources.ErrInvalidParams)
			}
			result = a / b
		default:
			return nil, fmt.Errorf("%w: 未知操作 %s", resources.ErrInvalidParams, operation)
		}
		
		return map[string]interface{}{
			"result": result,
		}, nil
	})
	
	// 在 goroutine 中启动服务器
	go func() {
		if err := srv.Start(context.Background()); err != nil {
			slog.Error("启动服务器失败", "error", err)
			os.Exit(1)
		}
	}()
	
	slog.Info("服务器已启动", "host", cfg.HTTP.Host, "port", cfg.HTTP.Port)
	
	// 等待终止信号
	sig := make(chan os.Signal, 1)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
	<-sig
	
	// 关闭服务器
	slog.Info("正在关闭服务器...")
	shutdownCtx, cancel := context.WithTimeout(context.Background(), cfg.HTTP.ShutdownTimeout)
	defer cancel()
	
	if err := srv.Stop(shutdownCtx); err != nil {
		slog.Error("停止服务器失败", "error", err)
	}
	
	slog.Info("服务器已停止")
}

### 使用外部 HTTP 服务器

您可以将自己的 HTTP 服务器与 MCP 传输一起使用:

```go
package main

import (
	"context"
	"log"
	"net/http"

	"github.com/go-chi/chi/v5"
	"github.com/go-chi/chi/v5/middleware"
	"github.com/go-chi/cors"
	"github.com/traego/scaled-mcp/pkg/config"
	"github.com/traego/scaled-mcp/pkg/server"
	"github.com/traego/scaled-mcp/pkg/transport"
)

func main() {
	// 使用默认配置创建服务器
	cfg := config.DefaultConfig()
	cfg.Session.UseInMemory = true
	
	// 创建 MCP 服务器,但不启动 HTTP 服务器
	srv, err := server.NewServer(cfg)
	if err != nil {
		log.Fatalf("创建服务器失败: %v", err)
	}
	
	// 创建自定义路由器
	r := chi.NewRouter()
	
	// 添加中间件
	r.Use(middleware.RequestID)
	r.Use(middleware.RealIP)
	r.Use(middleware.Logger)
	r.Use(middleware.Recoverer)
	
	// 添加 CORS 中间件 - 在使用外部服务器时很重要
	r.Use(cors.Handler(cors.Options{
		AllowedOrigins:   []string{"*"},
		AllowedMethods:   []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
		AllowedHeaders:   []string{"Accept", "Authorization", "Content-Type", "X-CSRF-Token"},
		ExposedHeaders:   []string{"Link"},
		AllowCredentials: true,
		MaxAge:           300,
	}))
	
	// 添加您的自定义路由
	r.Get("/", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("欢迎来到 MCP 服务器!"))
	})
	
	// 使用自定义路由器创建 HTTP 传输
	httpTransport := transport.NewHTTPTransport(
		cfg,
		srv.GetActorSystem(),
		srv.GetSessionManager(),
		transport.WithExternalRouter(r),
	)
	
	// 启动 MCP 服务器,但不启动 HTTP
	if err := srv.Start(context.Background()); err != nil {
		log.Fatalf("启动服务器失败: %v", err)
	}
	
	// 启动 HTTP 传输
	if err := httpTransport.Start(); err != nil {
		log.Fatalf("启动 HTTP 传输失败: %v", err)
	}
	
	// 启动您的 HTTP 服务器
	log.Printf("在 %s:%d 上启动 HTTP 服务器", cfg.HTTP.Host, cfg.HTTP.Port)
	if err := http.ListenAndServe(":8080", r); err != nil {
		log.Fatalf("HTTP 服务器错误: %v", err)
	}
}

## 动态工具注册表示例

此库支持静态和动态工具注册表。 这是一个使用动态工具注册表的 client_example:

```go
// 创建一个自定义工具提供程序
toolProvider := NewExampleToolProvider()

// 使用提供程序创建一个动态工具注册表
registry := resources.NewDynamicToolRegistry(toolProvider)

// 使用动态工具注册表创建服务器
cfg := config.DefaultConfig()
mcpServer, err := server.NewMcpServer(cfg,
    server.WithToolRegistry(registry),
)

工具定义

该库提供了两种定义工具输入的方法:

1. 使用 WithInputs (推荐)

weatherTool := resources.NewTool("weather").
    WithDescription("获取某个位置的天气信息").
    WithInputs([]resources.ToolInput{
        {
            Name:        "location",
            Type:        "string",
            Description: "要获取天气的地点",
            Required:    true,
        },
        {
            Name:        "units",
            Type:        "string",
            Description: "温度单位 (摄氏度或华氏度)",
            Default:     "celsius",
        },
    }).
    Build()

2. 使用单独的参数方法

calculatorTool := resources.NewTool("calculator").
    WithDescription("执行基本算术运算").
    WithString("operation").
    Required().
    Description("要执行的操作 (add, subtract, multiply, divide)").
    Add().
    WithNumber("a").
    Required().
    Description("第一个操作数").
    Add().
    WithNumber("b").
    Required().
    Description("第二个操作数").
    Add().
    Build()

重要提示

CORS 配置

当将外部 HTTP 服务器与 MCP 传输一起使用时,您需要在路由器上配置 CORS 设置。 如上面的示例所示,当使用外部路由器时,MCP 传输将不应用 CORS 设置。

会话管理

对于生产部署,建议使用 Redis 进行会话管理,以支持水平扩展。 内存会话存储应仅用于开发或测试。

待办事项

  • [ ] 授权示例 + 授权上下文流程
  • [ ] 指标端点 (prometheus),涵盖 actor 启动/停止、平均会话时长等
  • [ ] 会话 Actor 钩子
  • [ ] MCP 规范
    • [ ] 列表更改通知
    • [ ] 采样
    • [ ] 根
    • [ ] 完成
    • [ ] 日志记录
  • [ ] A2A 规范
    • [ ] 在此处添加了详细信息 https://github.com/Traego/scaled-mcp/wiki/A2A-Support
  • [ ] 管道化以支持插件
  • [ ] K8S 集群已连接 + 已测试
  • [ ] 搜索支持 (https://github.com/modelcontextprotocol/modelcontextprotocol/pull/322)
  • [ ] 所有功能的完整测试(资源、提示等)
  • [ ] 测试覆盖率达到 80%
  • [ ] 清理新的服务器 API,使其更容易启动
  • [ ] 更好的默认值
  • [ ] 获取 goakt 的 vNext 并替换自定义消息传递计划(这是一个错误解决方法)

贡献

欢迎贡献! 请随时提交 Pull Request。

  1. Fork 存储库
  2. 创建您的功能分支 (git checkout -b feature/amazing-feature)
  3. 提交您的更改 (git commit -m 'Add some amazing feature')
  4. 推送到分支 (git push origin feature/amazing-feature)
  5. 打开一个 Pull Request

开发

测试

运行测试:

go test ./...

运行带有覆盖率的测试:

go test -race -coverprofile=coverage.txt -covermode=atomic ./...

在浏览器中查看覆盖率报告:

go tool cover -html=coverage.txt

代码覆盖率

本项目使用 Codecov 进行代码覆盖率报告。 覆盖率报告在 CI 运行期间自动生成和上传。

要查看覆盖率仪表板,请访问 codecov.io/gh/traego/scaled-mcp

许可证

本项目根据 MIT 许可证 获得许可。

文档

有关详细的 API 文档,请参阅 GoDoc

推荐服务器

Crypto Price & Market Analysis MCP Server

Crypto Price & Market Analysis MCP Server

一个模型上下文协议 (MCP) 服务器,它使用 CoinCap API 提供全面的加密货币分析。该服务器通过一个易于使用的界面提供实时价格数据、市场分析和历史趋势。 (Alternative, slightly more formal and technical translation): 一个模型上下文协议 (MCP) 服务器,利用 CoinCap API 提供全面的加密货币分析服务。该服务器通过用户友好的界面,提供实时价格数据、市场分析以及历史趋势数据。

精选
TypeScript
MCP PubMed Search

MCP PubMed Search

用于搜索 PubMed 的服务器(PubMed 是一个免费的在线数据库,用户可以在其中搜索生物医学和生命科学文献)。 我是在 MCP 发布当天创建的,但当时正在度假。 我看到有人在您的数据库中发布了类似的服务器,但还是决定发布我的服务器。

精选
Python
mixpanel

mixpanel

连接到您的 Mixpanel 数据。 从 Mixpanel 分析查询事件、留存和漏斗数据。

精选
TypeScript
Sequential Thinking MCP Server

Sequential Thinking MCP Server

这个服务器通过将复杂问题分解为顺序步骤来促进结构化的问题解决,支持修订,并通过完整的 MCP 集成来实现多条解决方案路径。

精选
Python
Nefino MCP Server

Nefino MCP Server

为大型语言模型提供访问德国可再生能源项目新闻和信息的能力,允许按地点、主题(太阳能、风能、氢能)和日期范围进行筛选。

官方
Python
Vectorize

Vectorize

将 MCP 服务器向量化以实现高级检索、私有深度研究、Anything-to-Markdown 文件提取和文本分块。

官方
JavaScript
Mathematica Documentation MCP server

Mathematica Documentation MCP server

一个服务器,通过 FastMCP 提供对 Mathematica 文档的访问,使用户能够从 Wolfram Mathematica 检索函数文档和列出软件包符号。

本地
Python
kb-mcp-server

kb-mcp-server

一个 MCP 服务器,旨在实现便携性、本地化、简易性和便利性,以支持对 txtai “all in one” 嵌入数据库进行基于语义/图的检索。任何 tar.gz 格式的 txtai 嵌入数据库都可以被加载。

本地
Python
Research MCP Server

Research MCP Server

这个服务器用作 MCP 服务器,与 Notion 交互以检索和创建调查数据,并与 Claude Desktop Client 集成以进行和审查调查。

本地
Python
Cryo MCP Server

Cryo MCP Server

一个API服务器,实现了模型补全协议(MCP),用于Cryo区块链数据提取,允许用户通过任何兼容MCP的客户端查询以太坊区块链数据。

本地
Python