Spark MCP (Model Context Protocol) Optimizer
vgiri2015
README
Spark MCP (模型上下文协议) 优化器
本项目实现了一个模型上下文协议 (MCP) 服务器和客户端,用于优化 Apache Spark 代码。该系统通过客户端-服务器架构提供智能的代码优化建议和性能分析。
工作原理
代码优化工作流程
graph TB
subgraph Input
A[输入 PySpark 代码] --> |spark_code_input.py| B[run_client.py]
end
subgraph MCP 客户端
B --> |异步 HTTP| C[SparkMCPClient]
C --> |协议处理器| D[工具接口]
end
subgraph MCP 服务器
E[run_server.py] --> F[SparkMCPServer]
F --> |工具注册表| G[optimize_spark_code]
F --> |工具注册表| H[analyze_performance]
F --> |协议处理器| I[Claude AI 集成]
end
subgraph 资源
I --> |代码分析| J[Claude AI 模型]
J --> |优化| K[优化代码生成]
K --> |验证| L[PySpark 运行时]
end
subgraph 输出
M[optimized_spark_code.py]
N[performance_analysis.md]
end
D --> |MCP 请求| F
G --> |生成| M
H --> |生成| N
classDef client fill:#e1f5fe,stroke:#01579b
classDef server fill:#f3e5f5,stroke:#4a148c
classDef resource fill:#e8f5e9,stroke:#1b5e20
classDef output fill:#fff3e0,stroke:#e65100
class A,B,C,D client
class E,F,G,H,I server
class J,K,L resource
class M,N,O output
组件详情
-
输入层
spark_code_input.py
: 用于优化的源 PySpark 代码run_client.py
: 客户端启动和配置
-
MCP 客户端层
- 工具接口:符合协议的工具调用
-
MCP 服务器层
run_server.py
: 服务器初始化- 工具注册表:优化和分析工具
- 协议处理器:MCP 请求/响应管理
-
资源层
- Claude AI:代码分析和优化
- PySpark 运行时:代码执行和验证
-
输出层
optimized_spark_code.py
: 优化后的代码performance_analysis.md
: 详细分析
此工作流程说明:
- 输入 PySpark 代码提交
- MCP 协议处理和路由
- Claude AI 分析和优化
- 代码转换和验证
- 性能分析和报告
架构
本项目遵循模型上下文协议架构,用于标准化 AI 模型交互:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ MCP 服务器 │ │ 资源 │
│ MCP 客户端 │ │ (SparkMCPServer)│ │ │
│ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │
│ │ │ ┌─────────┐ │ │ │ Claude AI │ │
│ ┌─────────┐ │ │ │ 工具 │ │ <──> │ │ 模型 │ │
│ │ 工具 │ │ │ │注册表 │ │ │ └──────────────┘ │
│ │接口│ │ │ └─────────┘ │ │ │
│ └─────────┘ │ <──> │ ┌─────────┐ │ │ ┌──────────────┐ │
│ │ │ │协议 │ │ │ │ PySpark │ │
│ │ │ │处理器 │ │ │ │ 运行时 │ │
│ │ │ └─────────┘ │ │ └──────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ │ │
v v v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 可用 │ │ 已注册 │ │ 外部 │
│ 工具 │ │ 工具 │ │ 资源 │
├──────────────┤ ├──────────────┤ ├──────────────┤
│optimize_code │ │optimize_code │ │ Claude API │
│analyze_perf │ │analyze_perf │ │ Spark 引擎 │
└──────────────┘ └──────────────┘ └──────────────┘
组件
-
MCP 客户端
- 提供用于代码优化的工具接口
- 处理与服务器的异步通信
- 管理代码生成的文件 I/O
-
MCP 服务器
- 实现 MCP 协议处理器
- 管理工具注册表和执行
- 协调客户端和资源
-
资源
- Claude AI:提供代码优化智能
- PySpark 运行时:执行和验证优化
协议流程
- 客户端通过 MCP 协议发送优化请求
- 服务器验证请求并调用适当的工具
- 工具利用 Claude AI 进行优化
- 优化后的代码通过 MCP 响应返回
- 客户端保存并验证优化后的代码
端到端功能
sequenceDiagram
participant U as 用户
participant C as MCP 客户端
participant S as MCP 服务器
participant AI as Claude AI
participant P as PySpark 运行时
U->>C: 提交 Spark 代码
C->>S: 发送优化请求
S->>AI: 分析代码
AI-->>S: 优化建议
S->>C: 返回优化后的代码
C->>P: 运行原始代码
C->>P: 运行优化后的代码
P-->>C: 执行结果
C->>C: 生成分析
C-->>U: 最终报告
-
代码提交
- 用户将 PySpark 代码放置在
v1/input/spark_code_input.py
中 - 代码由 MCP 客户端读取
- 用户将 PySpark 代码放置在
-
优化过程
- MCP 客户端通过标准化协议连接到服务器
- 服务器将代码转发到 Claude AI 进行分析
- AI 根据最佳实践提出优化建议
- 服务器验证和处理建议
-
代码生成
- 优化后的代码保存到
v1/output/optimized_spark_code.py
- 包括详细的注释,解释优化
- 在提高性能的同时保持原始代码结构
- 优化后的代码保存到
-
性能分析
- 两个版本都在 PySpark 运行时中执行
- 比较执行时间
- 验证结果的正确性
- 收集和分析指标
-
结果生成
- 在
v1/output/performance_analysis.md
中进行全面分析 - 并排执行比较
- 性能改进统计
- 优化解释和原理
- 在
用法
要求
- Python 3.8+
- PySpark 3.2.0+
- Anthropic API 密钥(用于 Claude AI)
安装
pip install -r requirements.txt
快速开始
-
将要优化的 Spark 代码添加到
input/spark_code_input.py
-
启动 MCP 服务器:
python v1/run_server.py
- 运行客户端以优化您的代码:
python v1/run_client.py
这将生成两个文件:
output/optimized_spark_example.py
: 带有详细优化注释的优化后的 Spark 代码output/performance_analysis.md
: 全面的性能分析
- 运行并比较代码版本:
python v1/run_optimized.py
这将:
- 执行原始代码和优化后的代码
- 比较执行时间和结果
- 使用执行指标更新性能分析
- 显示详细的性能改进统计信息
项目结构
ai-mcp/
├── input/
│ └── spark_code_input.py # 要优化的原始 Spark 代码
├── output/
│ ├── optimized_spark_example.py # 生成的优化代码
│ └── performance_analysis.md # 详细的性能比较
├── spark_mcp/
│ ├── client.py # MCP 客户端实现
│ └── server.py # MCP 服务器实现
├── run_client.py # 用于优化代码的客户端脚本
├── run_server.py # 服务器启动脚本
└── run_optimized.py # 用于运行和比较代码版本的脚本
为什么选择 MCP?
模型上下文协议 (MCP) 为 Spark 代码优化提供了几个关键优势:
直接调用 Claude AI 与 MCP 服务器
方面 | 直接调用 Claude AI | MCP 服务器 |
---|---|---|
集成 | • 每个团队的自定义集成<br>• 手动响应处理<br>• 重复实现 | • 预构建的客户端库<br>• 自动化工作流程<br>• 统一接口 |
基础设施 | • 没有内置验证<br>• 没有结果持久化<br>• 手动跟踪 | • 自动验证<br>• 结果持久化<br>• 版本控制 |
上下文 | • 基本代码建议<br>• 没有执行上下文<br>• 有限的优化范围 | • 上下文感知的优化<br>• 完整的执行历史记录<br>• 全面的改进 |
验证 | • 需要手动测试<br>• 没有性能指标<br>• 不确定的结果 | • 自动化测试<br>• 性能指标<br>• 验证的结果 |
工作流程 | • 特设流程<br>• 没有标准化<br>• 需要手动干预 | • 结构化流程<br>• 标准协议<br>• 自动化管道 |
主要区别:
1. AI 集成
方法 | 代码示例 | 优点 |
---|---|---|
传统 | client = anthropic.Client(api_key) <br>response = client.messages.create(...) |
• 复杂的设置<br>• 自定义错误处理<br>• 紧耦合 |
MCP | client = SparkMCPClient() <br>result = await client.optimize_spark_code(code) |
• 简单的接口<br>• 内置验证<br>• 松耦合 |
2. 工具管理
方法 | 代码示例 | 优点 |
---|---|---|
传统 | class SparkOptimizer: <br> def register_tool(self, name, func): <br> self.tools[name] = func |
• 手动注册<br>• 没有验证<br>• 复杂的维护 |
MCP | @register_tool("optimize_spark_code") <br>async def optimize_spark_code(code: str): |
• 自动发现<br>• 类型检查<br>• 易于扩展 |
3. 资源管理
方法 | 代码示例 | 优点 |
---|---|---|
传统 | def __init__(self): <br> self.claude = init_claude() <br> self.spark = init_spark() |
• 手动编排<br>• 手动清理<br>• 容易出错 |
MCP | @requires_resources(["claude_ai", "spark"]) <br>async def optimize_spark_code(code: str): |
• 自动协调<br>• 生命周期管理<br>• 错误处理 |
4. 通信协议
方法 | 代码示例 | 优点 |
---|---|---|
传统 | {"type": "request", <br> "payload": {"code": code}} |
• 自定义格式<br>• 手动验证<br>• 自定义调试 |
MCP | {"method": "tools/call", <br> "params": {"name": "optimize_code"}} |
• 标准格式<br>• 自动验证<br>• 易于调试 |
功能
- 智能代码优化:利用 Claude AI 分析和优化 PySpark 代码
- 性能分析:提供原始代码和优化代码之间性能差异的详细分析
- MCP 架构:实现模型上下文协议,用于标准化 AI 模型交互
- 易于集成:用于代码优化请求的简单客户端接口
- 代码生成:自动将优化后的代码保存到单独的文件中
高级用法
您还可以以编程方式使用客户端:
from spark_mcp.client import SparkMCPClient
async def main():
# 连接到 MCP 服务器
client = SparkMCPClient()
await client.connect()
# 要优化的 Spark 代码
spark_code = '''
# 您的 PySpark 代码
'''
# 获取优化后的代码和性能分析
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # 保存到 output/optimized_spark_example.py
)
# 分析性能差异
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # 保存到 output/performance_analysis.md
)
# 运行两个版本并进行比较
# 您可以使用 run_optimized.py 脚本或实现您自己的比较
await client.close()
# 分析性能
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()
输入和输出示例
该存储库包含一个示例工作流程:
- 输入代码 (
input/spark_code_input.py
):
# 创建 DataFrames 并连接
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# 连接和分析
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")
- 优化后的代码 (
output/optimized_spark_example.py
):
# 性能优化版本,具有缓存和改进的配置
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# 创建和缓存 DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# 优化后的连接和分析
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")
- 性能分析 (
output/performance_analysis.md
):
## 执行结果比较
### 时间比较
- 原始代码:5.18 秒
- 优化后的代码:0.65 秒
- 性能提升:87.4%
### 优化细节
- 缓存常用 DataFrames
- 优化 shuffle 分区
- 改进的列表达式
- 更好的内存管理
项目结构
ai-mcp/
├── spark_mcp/
│ ├── __init__.py
│ ├── client.py # MCP 客户端实现
│ └── server.py # MCP 服务器实现
├── examples/
│ ├── optimize_code.py # 用法示例
│ └── optimized_spark_example.py # 生成的优化代码
├── requirements.txt
└── run_server.py # 服务器启动脚本
可用工具
-
optimize_spark_code
- 优化 PySpark 代码以获得更好的性能
- 支持基本和高级优化级别
- 自动将优化后的代码保存到 examples/optimized_spark_example.py
-
analyze_performance
- 分析原始代码和优化代码之间的性能差异
- 提供以下方面的见解:
- 性能改进
- 资源利用率
- 可扩展性考虑因素
- 潜在的权衡
环境变量
ANTHROPIC_API_KEY
: 您的 Claude AI 的 Anthropic API 密钥
优化示例
该系统实现了各种 PySpark 优化,包括:
- 用于小表连接大表的广播连接
- 有效的窗口函数使用
- 战略性数据缓存
- 查询计划优化
- 面向性能的操作排序
贡献
欢迎提交问题和增强请求!
许可证
MIT 许可证
推荐服务器
Crypto Price & Market Analysis MCP Server
一个模型上下文协议 (MCP) 服务器,它使用 CoinCap API 提供全面的加密货币分析。该服务器通过一个易于使用的界面提供实时价格数据、市场分析和历史趋势。 (Alternative, slightly more formal and technical translation): 一个模型上下文协议 (MCP) 服务器,利用 CoinCap API 提供全面的加密货币分析服务。该服务器通过用户友好的界面,提供实时价格数据、市场分析以及历史趋势数据。
MCP PubMed Search
用于搜索 PubMed 的服务器(PubMed 是一个免费的在线数据库,用户可以在其中搜索生物医学和生命科学文献)。 我是在 MCP 发布当天创建的,但当时正在度假。 我看到有人在您的数据库中发布了类似的服务器,但还是决定发布我的服务器。
mixpanel
连接到您的 Mixpanel 数据。 从 Mixpanel 分析查询事件、留存和漏斗数据。

Sequential Thinking MCP Server
这个服务器通过将复杂问题分解为顺序步骤来促进结构化的问题解决,支持修订,并通过完整的 MCP 集成来实现多条解决方案路径。

Nefino MCP Server
为大型语言模型提供访问德国可再生能源项目新闻和信息的能力,允许按地点、主题(太阳能、风能、氢能)和日期范围进行筛选。
Vectorize
将 MCP 服务器向量化以实现高级检索、私有深度研究、Anything-to-Markdown 文件提取和文本分块。
Mathematica Documentation MCP server
一个服务器,通过 FastMCP 提供对 Mathematica 文档的访问,使用户能够从 Wolfram Mathematica 检索函数文档和列出软件包符号。
kb-mcp-server
一个 MCP 服务器,旨在实现便携性、本地化、简易性和便利性,以支持对 txtai “all in one” 嵌入数据库进行基于语义/图的检索。任何 tar.gz 格式的 txtai 嵌入数据库都可以被加载。
Research MCP Server
这个服务器用作 MCP 服务器,与 Notion 交互以检索和创建调查数据,并与 Claude Desktop Client 集成以进行和审查调查。

Cryo MCP Server
一个API服务器,实现了模型补全协议(MCP),用于Cryo区块链数据提取,允许用户通过任何兼容MCP的客户端查询以太坊区块链数据。