# Spark MCP Optimizer Implementation Plan
## Goal
Build an MCP server that acts as an intelligent Spark tuning assistant. It will ingest Spark History Server data and user code to produce actionable configuration and code-level recommendations.
## User Review Required
> [!IMPORTANT]
> **Scala Parsing**: Detailed AST analysis of Scala code from Python is complex. I will implement a robust regex/keyword-based analyzer for Scala to identify transformations (joins, udas, etc.) and a full `ast` parser for PySpark.
> **Spark History Server**: I will default to connecting to `http://localhost:18080` (the server we verified earlier) but allow configuration.
## Proposed Architecture
### 1. Project Structure
Location: `/Users/user/Documents/spark_mcp_optimizer/`
```
├── src/
│ ├── main.py # MCP Server Entrypoint
│ ├── client.py # Spark History Server API Client
│ ├── analyzer.py # Core Analysis Logic (Skew, Spill, Resources)
│ ├── parser.py # Code Parser (PySpark AST + Scala Pattern)
│ ├── recommender.py # Recommendation Engine (Config + Code)
│ └── models.py # Pydantic models for data structures
├── requirements.txt
└── README.md
```
### 2. MCP Server (Tools)
We will use the Python MCP SDK to expose the following tools:
#### Data Fetching
- `get_apps()`: List available Spark applications.
- `get_job_details(app_id)`: Fetch full event/metric data for a job.
- `fetch_sql_plan(app_id, execution_id)`: Get the physical plan.
#### Analysis Tools
- `analyze_job(app_id, code_content=None)`: Main entry point. Runs all checks.
1. Fetches metrics (SHS API).
2. Parses code (if provided).
3. Runs heuristics (Skew, Spill, GC).
4. Generates suggestions.
### 3. LLM Integration Strategy
Shift from purely heuristic local analysis to LLM-driven analysis. The MCP server will provide a comprehensive "Context Tool" that limits token usage while maximizing information density.
### New Tool: `get_full_job_context`
- **Inputs**: `app_id`, `source_code_path` (optional)
- **Outputs**: Markdown-formatted text containing:
- **App Summary**: Duration, Status, Config.
- **Metric Highlights**: Top 5 stages by duration, Skewed stages, Spilled stages.
- **Source Code**: Read from provided path (if reliable) or just the relevant snippets if we can map them.
- **Pre-computed Signals**: Output of `Analyzer` (skew ratios, GC/CPU ratios) to aid the LLM.
### Deprecation
- `CodeParser` (internal AST/Regex) will be deprecated or used only as a fallback hint.
- `Recommender` (hardcoded strings) will be replaced by the LLM's natural generation capabilities, fed by the `Context`.
## Proposed Changes
### [src]
#### [MODIFY] [main.py](file:///Users/user/Documents/spark_mcp_optimizer/src/main.py)
- Add `get_full_job_context` tool.
- Retain data fetchers (`get_apps`, `get_stages`).
#### [NEW] [context.py](file:///Users/user/Documents/spark_mcp_optimizer/src/context.py)
- Logic to aggregate and format metrics + code into a prompt-friendly string.
#### [MODIFY] [analyzer.py](file:///Users/user/Documents/spark_mcp_optimizer/src/analyzer.py)
- Ensure it produces clean "Signals" (e.g., `gc_overhead_ratio`) rather than just boolean alerts.
### 4. Analysis Logic
#### **Skew Detection**
- **Logic**: Compare `max(taskDuration)` vs `median(taskDuration)` in a stage.
- **Trigger**: If ratio > 2.0 (configurable).
- **Recommendation**: Salt keys, increase partitions, or use AQE.
#### **Spill Analysis**
- **Logic**: Check for `diskBytesSpilled > 0` in stage metrics.
- **Trigger**: Any spill.
- **Recommendation**: Increase `spark.executor.memory`, increase partial aggregation, or check for skew.
#### **Code Correlation**
- **PySpark**: traverse AST for `join`, `groupBy`, `udf`, `repartition`.
- **Scala**: Regex match `\.join\(`, `\.groupByKey`, `udf\(`, `repartition`.
- **Logic**: If "Skew Detected" in Stage X, and Stage X corresponds to Line Y with a `join`, suggest `BroadcastHashJoin` or salting.
### 4. Recommendation Engine
- **Config**: Map detected issues to `spark.sql.*` or `spark.executor.*` keys.
- **Code**: Map detected anti-patterns (e.g., `groupByKey`) to best practices (`reduceByKey`).
## Verification Plan
### Automated Tests
- Unit tests for the `analyzer` and `parser` modules using mock data.
### Manual Verification
1. **Start Local Cluster**: Ensure the Spark 3.5.3 cluster is running.
2. **Run Bad Job**: Submit a deliberately inefficient Spark job (skewed join or heavy spill) to the local cluster.
3. **Run MCP Analysis**: Point the MCP server at the local History Server and the bad job's ID.
4. **Verify Output**: Check if it correctly identifies the skew/spill and suggests the specific fix.