# Agentic Spark Optimization System - Complete Flow Explanation
## System Architecture Overview
```
┌─────────────────────────────────────────────────────────────────────┐
│ USER INTERACTION │
├─────────────────────────────────────────────────────────────────────┤
│ CLI: spark_optimize.py --appId app_123 --jobCode job.py │
│ OR │
│ MCP Server: python -m src.server (exposes tools) │
└────────────────────┬────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ OPTIMIZATION ENGINE │
│ (src/optimizer/engine.py) │
├─────────────────────────────────────────────────────────────────────┤
│ 1. Gather Context from Spark History Server │
│ 2. Orchestrate Specialized Agents │
│ 3. Build Optimization Report │
└────────────────────┬────────────────────────────────────────────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────┐
│ Spark History │ │ LLM Client │
│ Client │ │ (Gemini API) │
│ (src/client.py) │ │ (src/llm_client.py) │
└────────┬─────────┘ └──────────┬───────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────────┐
│ Spark History │ │ 6 Specialized │
│ Server REST API │ │ Agents │
│ (localhost:18080)│ │ (src/optimizer/ │
└──────────────────┘ │ agents.py) │
└──────────────────────┘
```
## Detailed Flow Breakdown
### Phase 1: User Initiates Analysis
**Entry Points:**
1. **CLI Mode**:
```bash
python3 spark_optimize.py \
--appId application_1234567890_0001 \
--historyUrl http://localhost:18080 \
--jobCode path/to/job.py \
--output reports/analysis.json
```
2. **MCP Server Mode**:
```bash
python3 -m src.server
# Exposes tools like get_application_summary, get_stages, etc.
```
**What Happens:**
- `src/main.py` (CLI) or `src/server.py` (MCP) receives the request
- Creates instances of:
- `SparkHistoryClient(history_url)`
- `LLMClient(api_key)`
- `OptimizationEngine(spark_client, llm_client)`
---
### Phase 2: Context Gathering
**File**: `src/optimizer/engine.py` → `_gather_context()`
```python
def _gather_context(self, app_id: str) -> Dict[str, Any]:
# 1. Fetch all stages
stages = self.spark_client.get_stages(app_id)
# 2. For top 5 longest stages, get detailed metrics
sorted_stages = sorted(stages, key=lambda s: s.executorRunTime, reverse=True)[:5]
task_distributions = {}
for stage in sorted_stages:
# Fetch with quantiles for skew detection
details = self.spark_client.get_stage_details(app_id, stage.stageId)
task_distributions[stage.stageId] = details['taskMetricsDistributions']
# 3. Gather all context
return {
"app_info": self.spark_client.get_applications(),
"jobs": self.spark_client.get_jobs(app_id),
"stages": stages,
"executors": self.spark_client.get_executors(app_id),
"sql": self.spark_client.get_sql_metrics(app_id),
"rdd": self.spark_client.get_rdd_storage(app_id),
"environment": self.spark_client.get_environment(app_id),
"task_distribution": task_distributions,
"code": job_code # If provided
}
```
**Spark History Client Flow**:
```
SparkHistoryClient._get(endpoint)
↓
HTTP GET: http://localhost:18080/api/v1/{endpoint}
↓
Parse JSON response
↓
Map to dataclass models (StageMetric, ExecutorMetric, etc.)
↓
Return to engine
```
**Key API Calls**:
- `/applications` - App metadata
- `/applications/{id}/jobs` - Job list
- `/applications/{id}/stages` - Stage metrics
- `/applications/{id}/stages/{id}?quantiles=0.05,0.25,0.5,0.75,0.95` - Detailed stage with task distributions
- `/applications/{id}/executors` - Executor info
- `/applications/{id}/sql` - SQL execution metrics
- `/applications/{id}/storage/rdd` - Cached RDD info
- `/applications/{id}/environment` - Spark configuration
---
### Phase 3: Agent Orchestration
**File**: `src/optimizer/engine.py` → `analyze_application()`
```python
def analyze_application(self, app_id: str, code_path: str = None) -> OptimizationReport:
# 1. Gather context (Phase 2)
context = self._gather_context(app_id)
# 2. Run agents in sequence
exec_result = self.execution_agent.analyze(context)
shuffle_result = self.shuffle_agent.analyze(context)
skew_result = self.skew_agent.analyze(context)
sql_result = self.sql_agent.analyze(context)
config_result = self.config_agent.analyze(context)
code_result = self.code_agent.analyze(context) if code else {}
# 3. Build report
return self._build_report(app_id, context, exec_result, shuffle_result, ...)
```
**Agent Analysis Flow** (for each agent):
```
BaseAgent.analyze(context)
↓
1. Build prompt with context
↓
2. Call LLM via llm_client.generate_recommendation(prompt)
↓
3. Parse JSON response
↓
4. Return structured data
```
---
### Phase 4: Individual Agent Analysis
#### Agent 1: ExecutionAnalysisAgent
**Purpose**: Detect long-running stages, executor imbalance
**Prompt**:
```
You are an Expert Execution Analyzer for Spark.
Context:
- Jobs: {jobs}
- Stages: {stages}
- Executors: {executors}
Identify:
1. Bottleneck stages (long duration)
2. Executor imbalance
3. GC overhead
Output JSON:
{
"bottlenecks": [{"stageId": 0, "issue": "...", "severity": "high"}],
"imbalance_detected": true/false,
"overhead_analysis": "..."
}
```
**LLM Response** → Parsed to dict
---
#### Agent 2: ShuffleSpillAgent
**Purpose**: Detect shuffle inefficiencies, memory spill
**Prompt**:
```
You are an Expert Shuffle & Spill Analyzer.
Context:
- Stages: {stages with shuffle metrics}
Detect:
1. Excessive shuffle
2. Disk/memory spill
3. Partitioning issues
Output JSON:
{
"spill_issues": [{"stageId": 0, "memorySpill": "50MB", ...}],
"partitioning_issues": "..."
}
```
---
#### Agent 3: SkewDetectionAgent
**Purpose**: Detect data skew from task distributions
**Prompt**:
```
You are an Expert Skew Detector.
Context:
- Stages: {stages}
- Task Distribution (quantiles): {task_distribution}
Calculate skew_ratio = (95th percentile) / (median)
If skew_ratio > 2.0, mark as skewed.
Output JSON:
{
"skewed_stages": [{
"stageId": 0,
"skewType": "duration|size",
"skew_ratio": 10.0,
"max_duration": 5000.0,
"median_duration": 500.0
}],
"suggested_mitigations": ["Enable AQE", ...]
}
```
---
#### Agent 4: SQLPlanAgent
**Purpose**: Analyze SQL execution plans
**Prompt**:
```
You are an Expert SQL Plan Analyzer.
Context:
- SQL Executions: {sql}
Identify:
1. Inefficient joins (Cartesian, broadcast misuse)
2. Missing predicates
3. AQE opportunities
Output JSON:
{
"inefficient_joins": [...],
"missing_predicates": [...],
"aqe_opportunities": "..."
}
```
---
#### Agent 5: ConfigRecommendationAgent
**Purpose**: Recommend Spark configuration changes
**Prompt**:
```
You are an Expert Spark Configuration Tuner.
Context:
- Environment: {environment config}
- Bottlenecks: {from execution agent}
- Spill: {from shuffle agent}
Recommend config changes for:
- Memory settings
- Parallelism
- Serialization
- GC tuning
Output JSON:
{
"recommendations": [{
"config": "spark.executor.memory",
"current": "1g",
"suggested": "4g",
"reason": "Reduce GC pressure"
}]
}
```
---
#### Agent 6: CodeRecommendationAgent
**Purpose**: Analyze code for anti-patterns
**Prompt**:
```
You are an Expert Spark Code Reviewer.
Context:
- Code: {job_code}
- Metrics: {stages, jobs}
Identify:
1. Actions in loops
2. Unnecessary collect()
3. Inefficient transformations
4. Caching issues
Output JSON:
{
"code_issues": [{
"line": "23",
"issue": "collect() on large data",
"suggestion": "Use show() or write to storage"
}]
}
```
---
### Phase 5: LLM Interaction
**File**: `src/llm_client.py`
```python
def generate_recommendation(self, context_str: str) -> str:
full_prompt = f"{self.system_prompt}\n\n{context_str}"
# Retry logic for rate limits
for attempt in range(max_retries):
try:
response = self.model.generate_content(full_prompt)
return response.text
except Exception as e:
if "429" in str(e): # Rate limit
wait_time = retry_delay * (2 ** attempt)
time.sleep(wait_time)
else:
raise
return "Failed after retries"
```
**Flow**:
```
Agent builds prompt
↓
LLMClient.generate_recommendation(prompt)
↓
Call Gemini API (gemini-2.0-flash-exp)
↓
Retry with exponential backoff if rate limited
↓
Return text response
↓
Agent parses JSON from response
```
---
### Phase 6: Report Building
**File**: `src/optimizer/engine.py` → `_build_report()`
```python
def _build_report(self, app_id, context, exec_res, shuffle_res, skew_res, ...):
recommendations = []
# 1. Extract config recommendations
for rec in config_res.get('recommendations', []):
recommendations.append(Recommendation(
category="Configuration",
issue=rec['reason'],
suggestion=f"Set {rec['config']} to {rec['suggested']}",
evidence=f"Current: {rec['current']}",
impact_level="High"
))
# 2. Extract code recommendations
for rec in code_res.get('code_issues', []):
recommendations.append(Recommendation(
category="Code",
issue=rec['issue'],
suggestion=rec['suggestion'],
evidence=f"Line: {rec['line']}",
impact_level="Medium"
))
# 3. Build analysis objects
skew_analysis = [
SkewAnalysis(
is_skewed=True,
skew_ratio=s['skew_ratio'],
max_duration=s['max_duration'],
median_duration=s['median_duration'],
stage_id=s['stageId']
)
for s in skew_res.get('skewed_stages', [])
]
# 4. Extract spill metrics from actual stage data
spill_analysis = []
stage_map = {s['stageId']: s for s in context['stages']}
for s in shuffle_res.get('spill_issues', []):
stage_data = stage_map.get(s['stageId'], {})
spill_analysis.append(SpillAnalysis(
has_spill=True,
total_disk_spill=stage_data.get('diskBytesSpilled', 0),
total_memory_spill=stage_data.get('memoryBytesSpilled', 0),
stage_id=s['stageId']
))
# 5. Return complete report
return OptimizationReport(
app_id=app_id,
skew_analysis=skew_analysis,
spill_analysis=spill_analysis,
resource_analysis=[],
partitioning_analysis=[],
join_analysis=[],
recommendations=recommendations
)
```
---
### Phase 7: Output
**CLI Mode**:
```python
# src/main.py
report = engine.analyze_application(app_id, code_path=args.jobCode)
# Save to file
with open(args.output, 'w') as f:
json.dump(report.to_dict(), f, indent=2)
print(f"✅ Report saved to {args.output}")
```
**MCP Server Mode**:
```python
# src/server.py
# Tools are exposed for external systems to call
# Each tool returns JSON directly
```
---
## Complete Data Flow Example
### Example: Analyzing a Skewed Job
```
1. USER RUNS:
python3 spark_optimize.py --appId app_123 --jobCode job_skew.py
2. CONTEXT GATHERING:
SparkHistoryClient fetches:
- Stages: [{stageId: 0, executorRunTime: 10000, ...}, ...]
- Stage Details: {taskMetricsDistributions: {quantiles: [0.05, 0.5, 0.95],
executorRunTime: [100, 500, 5000]}}
- Jobs, Executors, SQL, RDD, Environment
- Code: "df.repartition(10)..."
3. AGENT ANALYSIS:
ExecutionAnalysisAgent:
→ Prompt: "Analyze stages: [{stageId: 0, executorRunTime: 10000}]"
→ LLM: "Stage 0 is slow, possible bottleneck"
→ Output: {"bottlenecks": [{"stageId": 0, "issue": "Long duration"}]}
SkewDetectionAgent:
→ Prompt: "Quantiles show 95th=5000, median=500, ratio=10.0"
→ LLM: "High skew detected, ratio > 2.0"
→ Output: {"skewed_stages": [{"stageId": 0, "skew_ratio": 10.0, ...}]}
ConfigRecommendationAgent:
→ Prompt: "Environment shows AQE disabled, skew detected"
→ LLM: "Enable AQE to handle skew automatically"
→ Output: {"recommendations": [{"config": "spark.sql.adaptive.enabled",
"suggested": "true"}]}
CodeRecommendationAgent:
→ Prompt: "Code has 'repartition(10)' and 'collect()'"
→ LLM: "Hardcoded partitions, dangerous collect()"
→ Output: {"code_issues": [{"line": "20", "issue": "Hardcoded repartition"}]}
4. REPORT BUILDING:
Combine all agent outputs into OptimizationReport:
{
"app_id": "app_123",
"skew_analysis": [{is_skewed: true, skew_ratio: 10.0, ...}],
"recommendations": [
{category: "Configuration", suggestion: "Enable AQE"},
{category: "Code", suggestion: "Use dynamic partitioning"}
]
}
5. OUTPUT:
Save to reports/analysis.json
Print "✅ Report saved"
```
---
## Error Handling Flow
```
Any API Call
↓
Try-Catch Block
↓
├─ Network Error → Log error, return empty/default
├─ JSON Parse Error → Log error, return empty/default
├─ 429 Rate Limit → Exponential backoff retry
└─ Other Error → Log error, return empty/default
↓
Continue with partial data (graceful degradation)
```
---
## Configuration Flow
```
Environment Variables (.env)
↓
src/config.py → OptimizerConfig
↓
Validates settings
↓
Used by:
- SparkHistoryClient (timeout, URL)
- LLMClient (API key, model, retries)
- OptimizationEngine (max stages to analyze)
```
---
## Key Design Principles
1. **Stateless**: No state stored between requests
2. **Graceful Degradation**: Partial data is better than failure
3. **LLM-Driven**: All analysis done by AI, no hardcoded rules
4. **Modular**: Each agent is independent
5. **Testable**: Mocks for all external dependencies
6. **Production-Ready**: Error handling, logging, retries
This flow ensures robust, intelligent analysis of Spark applications with minimal user intervention!