Spark MCP Optimizer
Provides tools to optimize Apache Spark code, including automatic optimization of PySpark code and performance analysis with execution metrics.
Click on "Install Server".
Wait a few minutes for the server to deploy. Once ready, it will show a "Started" state.
In the chat, type
@followed by the MCP server name and your instructions, e.g., "@Spark MCP Optimizeroptimize my PySpark code for better performance"
That's it! The server will respond to your query, and you can continue using it as needed.
Here is a step-by-step guide with screenshots.
Spark MCP (Model Context Protocol) Optimizer
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
How It Works
Code Optimization Workflow
graph TB
subgraph Input
A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py]
end
subgraph MCP Client
B --> |Async HTTP| C[SparkMCPClient]
C --> |Protocol Handler| D[Tools Interface]
end
subgraph MCP Server
E[run_server.py] --> F[SparkMCPServer]
F --> |Tool Registry| G[optimize_spark_code]
F --> |Tool Registry| H[analyze_performance]
F --> |Protocol Handler| I[Claude AI Integration]
end
subgraph Resources
I --> |Code Analysis| J[Claude AI Model]
J --> |Optimization| K[Optimized Code Generation]
K --> |Validation| L[PySpark Runtime]
end
subgraph Output
M[optimized_spark_code.py]
N[performance_analysis.md]
end
D --> |MCP Request| F
G --> |Generate| M
H --> |Generate| N
classDef client fill:#e1f5fe,stroke:#01579b
classDef server fill:#f3e5f5,stroke:#4a148c
classDef resource fill:#e8f5e9,stroke:#1b5e20
classDef output fill:#fff3e0,stroke:#e65100
class A,B,C,D client
class E,F,G,H,I server
class J,K,L resource
class M,N,O outputComponent Details
Input Layer
spark_code_input.py: Source PySpark code for optimizationrun_client.py: Client startup and configuration
MCP Client Layer
Tools Interface: Protocol-compliant tool invocation
MCP Server Layer
run_server.py: Server initializationTool Registry: Optimization and analysis tools
Protocol Handler: MCP request/response management
Resource Layer
Claude AI: Code analysis and optimization
PySpark Runtime: Code execution and validation
Output Layer
optimized_spark_code.py: Optimized codeperformance_analysis.md: Detailed analysis
This workflow illustrates:
Input PySpark code submission
MCP protocol handling and routing
Claude AI analysis and optimization
Code transformation and validation
Performance analysis and reporting
Architecture
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ MCP Server │ │ Resources │
│ MCP Client │ │ (SparkMCPServer)│ │ │
│ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │
│ │ │ ┌─────────┐ │ │ │ Claude AI │ │
│ ┌─────────┐ │ │ │ Tools │ │ <──> │ │ Model │ │
│ │ Tools │ │ │ │Registry │ │ │ └──────────────┘ │
│ │Interface│ │ <──> │ └─────────┘ │ │ │
│ └─────────┘ │ │ ┌─────────┐ │ │ ┌──────────────┐ │
│ │ │ │Protocol │ │ │ │ PySpark │ │
│ │ │ │Handler │ │ │ │ Runtime │ │
│ │ │ └─────────┘ │ │ └──────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ │ │
v v v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Available │ │ Registered │ │ External │
│ Tools │ │ Tools │ │ Resources │
├──────────────┤ ├──────────────┤ ├──────────────┤
│optimize_code │ │optimize_code │ │ Claude API │
│analyze_perf │ │analyze_perf │ │ Spark Engine │
└──────────────┘ └──────────────┘ └──────────────┘Components
MCP Client
Provides tool interface for code optimization
Handles async communication with server
Manages file I/O for code generation
MCP Server
Implements MCP protocol handlers
Manages tool registry and execution
Coordinates between client and resources
Resources
Claude AI: Provides code optimization intelligence
PySpark Runtime: Executes and validates optimizations
Protocol Flow
Client sends optimization request via MCP protocol
Server validates request and invokes appropriate tool
Tool utilizes Claude AI for optimization
Optimized code is returned via MCP response
Client saves and validates the optimized code
End-to-End Functionality
sequenceDiagram
participant U as User
participant C as MCP Client
participant S as MCP Server
participant AI as Claude AI
participant P as PySpark Runtime
U->>C: Submit Spark Code
C->>S: Send Optimization Request
S->>AI: Analyze Code
AI-->>S: Optimization Suggestions
S->>C: Return Optimized Code
C->>P: Run Original Code
C->>P: Run Optimized Code
P-->>C: Execution Results
C->>C: Generate Analysis
C-->>U: Final ReportCode Submission
User places PySpark code in
v1/input/spark_code_input.pyCode is read by the MCP client
Optimization Process
MCP client connects to server via standardized protocol
Server forwards code to Claude AI for analysis
AI suggests optimizations based on best practices
Server validates and processes suggestions
Code Generation
Optimized code saved to
v1/output/optimized_spark_code.pyIncludes detailed comments explaining optimizations
Maintains original code structure while improving performance
Performance Analysis
Both versions executed in PySpark runtime
Execution times compared
Results validated for correctness
Metrics collected and analyzed
Results Generation
Comprehensive analysis in
v1/output/performance_analysis.mdSide-by-side execution comparison
Performance improvement statistics
Optimization explanations and rationale
Usage
Requirements
Python 3.8+
PySpark 3.2.0+
Anthropic API Key (for Claude AI)
Installation
pip install -r requirements.txtQuick Start
Add your Spark code to optimize in
input/spark_code_input.pyStart the MCP server:
python v1/run_server.pyRun the client to optimize your code:
python v1/run_client.pyThis will generate two files:
output/optimized_spark_example.py: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md: Comprehensive performance analysis
Run and compare code versions:
python v1/run_optimized.pyThis will:
Execute both original and optimized code
Compare execution times and results
Update the performance analysis with execution metrics
Show detailed performance improvement statistics
Project Structure
ai-mcp/
├── input/
│ └── spark_code_input.py # Original Spark code to optimize
├── output/
│ ├── optimized_spark_example.py # Generated optimized code
│ └── performance_analysis.md # Detailed performance comparison
├── spark_mcp/
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── run_client.py # Client script to optimize code
├── run_server.py # Server startup script
└── run_optimized.py # Script to run and compare code versionsWhy MCP?
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
Direct Claude AI Call vs MCP Server
Aspect | Direct Claude AI Call | MCP Server |
Integration | • Custom integration per team • Manual response handling • Duplicate implementations | • Pre-built client libraries • Automated workflows • Unified interfaces |
Infrastructure | • No built-in validation • No result persistence • Manual tracking | • Automatic validation • Result persistence • Version control |
Context | • Basic code suggestions • No execution context • Limited optimization scope | • Context-aware optimization • Full execution history • Comprehensive improvements |
Validation | • Manual testing required • No performance metrics • Uncertain outcomes | • Automated testing • Performance metrics • Validated results |
Workflow | • Ad-hoc process • No standardization • Manual intervention needed | • Structured process • Standard protocols • Automated pipeline |
Key Differences:
1. AI Integration
Approach | Code Example | Benefits |
Traditional |
| • Complex setup • Custom error handling • Tight coupling |
MCP |
| • Simple interface • Built-in validation • Loose coupling |
2. Tool Management
Approach | Code Example | Benefits |
Traditional |
| • Manual registration • No validation • Complex maintenance |
MCP |
| • Auto-discovery • Type checking • Easy extension |
3. Resource Management
Approach | Code Example | Benefits |
Traditional |
| • Manual orchestration • Manual cleanup • Error-prone |
MCP |
| • Auto-coordination • Lifecycle management • Error handling |
4. Communication Protocol
Approach | Code Example | Benefits |
Traditional |
| • Custom format • Manual validation • Custom debugging |
MCP |
| • Standard format • Auto-validation • Easy debugging |
Features
Intelligent Code Optimization: Leverages Claude AI to analyze and optimize PySpark code
Performance Analysis: Provides detailed analysis of performance differences between original and optimized code
MCP Architecture: Implements the Model Context Protocol for standardized AI model interactions
Easy Integration: Simple client interface for code optimization requests
Code Generation: Automatically saves optimized code to separate files
Advanced Usage
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient
async def main():
# Connect to the MCP server
client = SparkMCPClient()
await client.connect()
# Your Spark code to optimize
spark_code = '''
# Your PySpark code here
'''
# Get optimized code with performance analysis
optimized_code = await client.optimize_spark_code(
code=spark_code,
optimization_level="advanced",
save_to_file=True # Save to output/optimized_spark_example.py
)
# Analyze performance differences
analysis = await client.analyze_performance(
original_code=spark_code,
optimized_code=optimized_code,
save_to_file=True # Save to output/performance_analysis.md
)
# Run both versions and compare
# You can use the run_optimized.py script or implement your own comparison
await client.close()
# Analyze performance
performance = await client.analyze_performance(spark_code, optimized_code)
await client.close()Example Input and Output
The repository includes an example workflow:
Input Code (
input/spark_code_input.py):
# Create DataFrames and join
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"])
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"])
# Join and analyze
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg({"salary": "avg", "age": "avg", "id": "count"}) \
.orderBy("dept")Optimized Code (
output/optimized_spark_example.py):
# Performance-optimized version with caching and improved configurations
spark = SparkSession.builder \
.appName("EmployeeAnalysis") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Create and cache DataFrames
emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache()
dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache()
# Optimized join and analysis
result = emp_df.join(dept_df, "dept") \
.groupBy("dept", "location") \
.agg(
avg("salary").alias("avg_salary"),
avg("age").alias("avg_age"),
count("id").alias("employee_count")
) \
.orderBy("dept")Performance Analysis (
output/performance_analysis.md):
## Execution Results Comparison
### Timing Comparison
- Original Code: 5.18 seconds
- Optimized Code: 0.65 seconds
- Performance Improvement: 87.4%
### Optimization Details
- Caching frequently used DataFrames
- Optimized shuffle partitions
- Improved column expressions
- Better memory managementProject Structure
ai-mcp/
├── spark_mcp/
│ ├── __init__.py
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── examples/
│ ├── optimize_code.py # Example usage
│ └── optimized_spark_example.py # Generated optimized code
├── requirements.txt
└── run_server.py # Server startup scriptAvailable Tools
optimize_spark_code
Optimizes PySpark code for better performance
Supports basic and advanced optimization levels
Automatically saves optimized code to examples/optimized_spark_example.py
analyze_performance
Analyzes performance differences between original and optimized code
Provides insights on:
Performance improvements
Resource utilization
Scalability considerations
Potential trade-offs
Environment Variables
ANTHROPIC_API_KEY: Your Anthropic API key for Claude AI
Example Optimizations
The system implements various PySpark optimizations including:
Broadcast joins for small-large table joins
Efficient window function usage
Strategic data caching
Query plan optimizations
Performance-oriented operation ordering
Contributing
Feel free to submit issues and enhancement requests!
License
MIT License
This server cannot be installed
Resources
Unclaimed servers have limited discoverability.
Looking for Admin?
If you are the server author, to access and configure the admin panel.
Latest Blog Posts
MCP directory API
We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/vgiri2015/ai-spark-mcp-server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server