Spark MCP (Model Context Protocol) Optimizer

Spark MCP (Model Context Protocol) Optimizer

vgiri2015

研究与数据
访问服务器

README

Spark MCP (模型上下文协议) 优化器

本项目实现了一个模型上下文协议 (MCP) 服务器和客户端,用于优化 Apache Spark 代码。该系统通过客户端-服务器架构提供智能的代码优化建议和性能分析。

工作原理

代码优化工作流程

graph TB
    subgraph Input
        A[输入 PySpark 代码] --> |spark_code_input.py| B[run_client.py]
    end

    subgraph MCP 客户端
        B --> |异步 HTTP| C[SparkMCPClient]
        C --> |协议处理器| D[工具接口]
    end

    subgraph MCP 服务器
        E[run_server.py] --> F[SparkMCPServer]
        F --> |工具注册表| G[optimize_spark_code]
        F --> |工具注册表| H[analyze_performance]
        F --> |协议处理器| I[Claude AI 集成]
    end

    subgraph 资源
        I --> |代码分析| J[Claude AI 模型]
        J --> |优化| K[优化代码生成]
        K --> |验证| L[PySpark 运行时]
    end

    subgraph 输出
        M[optimized_spark_code.py]
        N[performance_analysis.md]
    end

    D --> |MCP 请求| F
    G --> |生成| M
    H --> |生成| N

    classDef client fill:#e1f5fe,stroke:#01579b
    classDef server fill:#f3e5f5,stroke:#4a148c
    classDef resource fill:#e8f5e9,stroke:#1b5e20
    classDef output fill:#fff3e0,stroke:#e65100

    class A,B,C,D client
    class E,F,G,H,I server
    class J,K,L resource
    class M,N,O output

组件详情

  1. 输入层

    • spark_code_input.py: 用于优化的源 PySpark 代码
    • run_client.py: 客户端启动和配置
  2. MCP 客户端层

    • 工具接口:符合协议的工具调用
  3. MCP 服务器层

    • run_server.py: 服务器初始化
    • 工具注册表:优化和分析工具
    • 协议处理器:MCP 请求/响应管理
  4. 资源层

    • Claude AI:代码分析和优化
    • PySpark 运行时:代码执行和验证
  5. 输出层

    • optimized_spark_code.py: 优化后的代码
    • performance_analysis.md: 详细分析

此工作流程说明:

  1. 输入 PySpark 代码提交
  2. MCP 协议处理和路由
  3. Claude AI 分析和优化
  4. 代码转换和验证
  5. 性能分析和报告

架构

本项目遵循模型上下文协议架构,用于标准化 AI 模型交互:

┌──────────────────┐      ┌──────────────────┐      ┌──────────────────┐
│                  │      │   MCP 服务器     │      │    资源     │
│   MCP 客户端     │      │  (SparkMCPServer)│      │                  │
│ (SparkMCPClient) │      │                  │      │ ┌──────────────┐ │
│                  │      │    ┌─────────┐   │      │ │  Claude AI   │ │
│   ┌─────────┐    │      │    │ 工具   │   │ <──> │ │   模型      │ │
│   │ 工具   │    │      │    │注册表 │   │      │ └──────────────┘ │
│   │接口│    │      │    └─────────┘   │      │                  │
│   └─────────┘    │ <──> │    ┌─────────┐   │      │ ┌──────────────┐ │
│                  │      │    │协议 │   │      │ │  PySpark     │ │
│                  │      │    │处理器  │   │      │ │  运行时     │ │
│                  │      │    └─────────┘   │      │ └──────────────┘ │
└──────────────────┘      └──────────────────┘      └──────────────────┘

        │                         │                          │
        │                         │                          │
        v                         v                          v
┌──────────────┐          ┌──────────────┐           ┌──────────────┐
│  可用   │          │  已注册  │           │   外部   │
│    工具     │          │    工具     │           │  资源   │
├──────────────┤          ├──────────────┤           ├──────────────┤
│optimize_code │          │optimize_code │           │ Claude API   │
│analyze_perf  │          │analyze_perf  │           │ Spark 引擎 │
└──────────────┘          └──────────────┘           └──────────────┘

组件

  1. MCP 客户端

    • 提供用于代码优化的工具接口
    • 处理与服务器的异步通信
    • 管理代码生成的文件 I/O
  2. MCP 服务器

    • 实现 MCP 协议处理器
    • 管理工具注册表和执行
    • 协调客户端和资源
  3. 资源

    • Claude AI:提供代码优化智能
    • PySpark 运行时:执行和验证优化

协议流程

  1. 客户端通过 MCP 协议发送优化请求
  2. 服务器验证请求并调用适当的工具
  3. 工具利用 Claude AI 进行优化
  4. 优化后的代码通过 MCP 响应返回
  5. 客户端保存并验证优化后的代码

端到端功能

sequenceDiagram
    participant U as 用户
    participant C as MCP 客户端
    participant S as MCP 服务器
    participant AI as Claude AI
    participant P as PySpark 运行时

    U->>C: 提交 Spark 代码
    C->>S: 发送优化请求
    S->>AI: 分析代码
    AI-->>S: 优化建议
    S->>C: 返回优化后的代码
    C->>P: 运行原始代码
    C->>P: 运行优化后的代码
    P-->>C: 执行结果
    C->>C: 生成分析
    C-->>U: 最终报告
  1. 代码提交

    • 用户将 PySpark 代码放置在 v1/input/spark_code_input.py
    • 代码由 MCP 客户端读取
  2. 优化过程

    • MCP 客户端通过标准化协议连接到服务器
    • 服务器将代码转发到 Claude AI 进行分析
    • AI 根据最佳实践提出优化建议
    • 服务器验证和处理建议
  3. 代码生成

    • 优化后的代码保存到 v1/output/optimized_spark_code.py
    • 包括详细的注释,解释优化
    • 在提高性能的同时保持原始代码结构
  4. 性能分析

    • 两个版本都在 PySpark 运行时中执行
    • 比较执行时间
    • 验证结果的正确性
    • 收集和分析指标
  5. 结果生成

    • v1/output/performance_analysis.md 中进行全面分析
    • 并排执行比较
    • 性能改进统计
    • 优化解释和原理

用法

要求

  • Python 3.8+
  • PySpark 3.2.0+
  • Anthropic API 密钥(用于 Claude AI)

安装

pip install -r requirements.txt

快速开始

  1. 将要优化的 Spark 代码添加到 input/spark_code_input.py

  2. 启动 MCP 服务器:

python v1/run_server.py
  1. 运行客户端以优化您的代码:
python v1/run_client.py

这将生成两个文件:

  • output/optimized_spark_example.py: 带有详细优化注释的优化后的 Spark 代码
  • output/performance_analysis.md: 全面的性能分析
  1. 运行并比较代码版本:
python v1/run_optimized.py

这将:

  • 执行原始代码和优化后的代码
  • 比较执行时间和结果
  • 使用执行指标更新性能分析
  • 显示详细的性能改进统计信息

项目结构

ai-mcp/
├── input/
│   └── spark_code_input.py     # 要优化的原始 Spark 代码
├── output/
│   ├── optimized_spark_example.py  # 生成的优化代码
│   └── performance_analysis.md     # 详细的性能比较
├── spark_mcp/
│   ├── client.py               # MCP 客户端实现
│   └── server.py               # MCP 服务器实现
├── run_client.py              # 用于优化代码的客户端脚本
├── run_server.py              # 服务器启动脚本
└── run_optimized.py           # 用于运行和比较代码版本的脚本

为什么选择 MCP?

模型上下文协议 (MCP) 为 Spark 代码优化提供了几个关键优势:

直接调用 Claude AI 与 MCP 服务器

方面 直接调用 Claude AI MCP 服务器
集成 • 每个团队的自定义集成<br>• 手动响应处理<br>• 重复实现 • 预构建的客户端库<br>• 自动化工作流程<br>• 统一接口
基础设施 • 没有内置验证<br>• 没有结果持久化<br>• 手动跟踪 • 自动验证<br>• 结果持久化<br>• 版本控制
上下文 • 基本代码建议<br>• 没有执行上下文<br>• 有限的优化范围 • 上下文感知的优化<br>• 完整的执行历史记录<br>• 全面的改进
验证 • 需要手动测试<br>• 没有性能指标<br>• 不确定的结果 • 自动化测试<br>• 性能指标<br>• 验证的结果
工作流程 • 特设流程<br>• 没有标准化<br>• 需要手动干预 • 结构化流程<br>• 标准协议<br>• 自动化管道

主要区别:

1. AI 集成

方法 代码示例 优点
传统 client = anthropic.Client(api_key)<br>response = client.messages.create(...) • 复杂的设置<br>• 自定义错误处理<br>• 紧耦合
MCP client = SparkMCPClient()<br>result = await client.optimize_spark_code(code) • 简单的接口<br>• 内置验证<br>• 松耦合

2. 工具管理

方法 代码示例 优点
传统 class SparkOptimizer:<br>  def register_tool(self, name, func):<br>    self.tools[name] = func • 手动注册<br>• 没有验证<br>• 复杂的维护
MCP @register_tool("optimize_spark_code")<br>async def optimize_spark_code(code: str): • 自动发现<br>• 类型检查<br>• 易于扩展

3. 资源管理

方法 代码示例 优点
传统 def __init__(self):<br>  self.claude = init_claude()<br>  self.spark = init_spark() • 手动编排<br>• 手动清理<br>• 容易出错
MCP @requires_resources(["claude_ai", "spark"])<br>async def optimize_spark_code(code: str): • 自动协调<br>• 生命周期管理<br>• 错误处理

4. 通信协议

方法 代码示例 优点
传统 {"type": "request",<br> "payload": {"code": code}} • 自定义格式<br>• 手动验证<br>• 自定义调试
MCP {"method": "tools/call",<br> "params": {"name": "optimize_code"}} • 标准格式<br>• 自动验证<br>• 易于调试

功能

  • 智能代码优化:利用 Claude AI 分析和优化 PySpark 代码
  • 性能分析:提供原始代码和优化代码之间性能差异的详细分析
  • MCP 架构:实现模型上下文协议,用于标准化 AI 模型交互
  • 易于集成:用于代码优化请求的简单客户端接口
  • 代码生成:自动将优化后的代码保存到单独的文件中

高级用法

您还可以以编程方式使用客户端:

from spark_mcp.client import SparkMCPClient

async def main():
    # 连接到 MCP 服务器
    client = SparkMCPClient()
    await client.connect()

    # 要优化的 Spark 代码
    spark_code = '''
    # 您的 PySpark 代码
    '''

    # 获取优化后的代码和性能分析
    optimized_code = await client.optimize_spark_code(
        code=spark_code,
        optimization_level="advanced",
        save_to_file=True  # 保存到 output/optimized_spark_example.py
    )
    
    # 分析性能差异
    analysis = await client.analyze_performance(
        original_code=spark_code,
        optimized_code=optimized_code,
        save_to_file=True  # 保存到 output/performance_analysis.md
    )
    
    # 运行两个版本并进行比较
    # 您可以使用 run_optimized.py 脚本或实现您自己的比较
    
    await client.close()
    
    # 分析性能
    performance = await client.analyze_performance(spark_code, optimized_code)

    await client.close()

输入和输出示例

该存储库包含一个示例工作流程:

  1. 输入代码 (input/spark_code_input.py):
# 创建 DataFrames 并连接
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])

# 连接和分析
result = emp_df.join(dept_df, "dept") \
    .groupBy("dept", "location") \
    .agg({"salary": "avg", "age": "avg", "id": "count"}) \
    .orderBy("dept")
  1. 优化后的代码 (output/optimized_spark_example.py):
# 性能优化版本,具有缓存和改进的配置
spark = SparkSession.builder \
    .appName("EmployeeAnalysis") \
    .config("spark.sql.shuffle.partitions", 200) \
    .getOrCreate()

# 创建和缓存 DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()

# 优化后的连接和分析
result = emp_df.join(dept_df, "dept") \
    .groupBy("dept", "location") \
    .agg(
        avg("salary").alias("avg_salary"),
        avg("age").alias("avg_age"),
        count("id").alias("employee_count")
    ) \
    .orderBy("dept")
  1. 性能分析 (output/performance_analysis.md):
## 执行结果比较

### 时间比较
- 原始代码:5.18 秒
- 优化后的代码:0.65 秒
- 性能提升:87.4%

### 优化细节
- 缓存常用 DataFrames
- 优化 shuffle 分区
- 改进的列表达式
- 更好的内存管理

项目结构

ai-mcp/
├── spark_mcp/
│   ├── __init__.py
│   ├── client.py      # MCP 客户端实现
│   └── server.py      # MCP 服务器实现
├── examples/
│   ├── optimize_code.py           # 用法示例
│   └── optimized_spark_example.py # 生成的优化代码
├── requirements.txt
└── run_server.py      # 服务器启动脚本

可用工具

  1. optimize_spark_code

    • 优化 PySpark 代码以获得更好的性能
    • 支持基本和高级优化级别
    • 自动将优化后的代码保存到 examples/optimized_spark_example.py
  2. analyze_performance

    • 分析原始代码和优化代码之间的性能差异
    • 提供以下方面的见解:
      • 性能改进
      • 资源利用率
      • 可扩展性考虑因素
      • 潜在的权衡

环境变量

  • ANTHROPIC_API_KEY: 您的 Claude AI 的 Anthropic API 密钥

优化示例

该系统实现了各种 PySpark 优化,包括:

  • 用于小表连接大表的广播连接
  • 有效的窗口函数使用
  • 战略性数据缓存
  • 查询计划优化
  • 面向性能的操作排序

贡献

欢迎提交问题和增强请求!

许可证

MIT 许可证

推荐服务器

Crypto Price & Market Analysis MCP Server

Crypto Price & Market Analysis MCP Server

一个模型上下文协议 (MCP) 服务器,它使用 CoinCap API 提供全面的加密货币分析。该服务器通过一个易于使用的界面提供实时价格数据、市场分析和历史趋势。 (Alternative, slightly more formal and technical translation): 一个模型上下文协议 (MCP) 服务器,利用 CoinCap API 提供全面的加密货币分析服务。该服务器通过用户友好的界面,提供实时价格数据、市场分析以及历史趋势数据。

精选
TypeScript
MCP PubMed Search

MCP PubMed Search

用于搜索 PubMed 的服务器(PubMed 是一个免费的在线数据库,用户可以在其中搜索生物医学和生命科学文献)。 我是在 MCP 发布当天创建的,但当时正在度假。 我看到有人在您的数据库中发布了类似的服务器,但还是决定发布我的服务器。

精选
Python
mixpanel

mixpanel

连接到您的 Mixpanel 数据。 从 Mixpanel 分析查询事件、留存和漏斗数据。

精选
TypeScript
Sequential Thinking MCP Server

Sequential Thinking MCP Server

这个服务器通过将复杂问题分解为顺序步骤来促进结构化的问题解决,支持修订,并通过完整的 MCP 集成来实现多条解决方案路径。

精选
Python
Nefino MCP Server

Nefino MCP Server

为大型语言模型提供访问德国可再生能源项目新闻和信息的能力,允许按地点、主题(太阳能、风能、氢能)和日期范围进行筛选。

官方
Python
Vectorize

Vectorize

将 MCP 服务器向量化以实现高级检索、私有深度研究、Anything-to-Markdown 文件提取和文本分块。

官方
JavaScript
Mathematica Documentation MCP server

Mathematica Documentation MCP server

一个服务器,通过 FastMCP 提供对 Mathematica 文档的访问,使用户能够从 Wolfram Mathematica 检索函数文档和列出软件包符号。

本地
Python
kb-mcp-server

kb-mcp-server

一个 MCP 服务器,旨在实现便携性、本地化、简易性和便利性,以支持对 txtai “all in one” 嵌入数据库进行基于语义/图的检索。任何 tar.gz 格式的 txtai 嵌入数据库都可以被加载。

本地
Python
Research MCP Server

Research MCP Server

这个服务器用作 MCP 服务器,与 Notion 交互以检索和创建调查数据,并与 Claude Desktop Client 集成以进行和审查调查。

本地
Python
Cryo MCP Server

Cryo MCP Server

一个API服务器,实现了模型补全协议(MCP),用于Cryo区块链数据提取,允许用户通过任何兼容MCP的客户端查询以太坊区块链数据。

本地
Python