# Agentic Spark Job Optimization System - Production Ready
## Overview
A fully agentic system that uses LLM-powered agents to analyze Spark applications and provide configuration and code optimization recommendations without running calculations in code.
## Features
- ✅ **MCP Server**: Exposes Spark History Server data as MCP tools
- ✅ **Specialized Agents**: 6 expert agents for different analysis domains
- ✅ **Production-Ready**: Comprehensive error handling, logging, and configuration
- ✅ **Extensive Testing**: 11+ unit tests, 7 edge case tests, integration tests
- ✅ **Golden Test Cases**: 7 test jobs covering common performance issues
## Quick Start
### Installation
```bash
pip install -r requirements.txt
cp .env.example .env
# Edit .env with your GEMINI_API_KEY
```
### Run Analysis
```bash
export GEMINI_API_KEY=your-key-here
python3 spark_optimize.py \
--appId application_1234567890_0001 \
--historyUrl http://localhost:18080 \
--jobCode path/to/job.py \
--output reports/analysis.json
```
## Architecture
### Components
1. **Spark History MCP Server** (`src/server.py`)
- Exposes 10 MCP tools for Spark metrics
- Stateless, read-only design
2. **Optimization Engine** (`src/optimizer/engine.py`)
- Orchestrates agent analysis
- Gathers context from Spark History Server
- Builds structured optimization reports
3. **Specialized Agents** (`src/optimizer/agents.py`)
- ExecutionAnalysisAgent: Long-running stages, executor imbalance
- ShuffleSpillAgent: Shuffle efficiency, memory pressure
- SkewDetectionAgent: Data skew detection
- SQLPlanAgent: SQL plan inefficiencies
- ConfigRecommendationAgent: Spark configuration tuning
- CodeRecommendationAgent: Code-level optimizations
### Data Flow
```
Spark History Server → SparkHistoryClient → OptimizationEngine → Agents → Report
```
## Test Coverage
### Unit Tests (11 tests)
- `tests/test_optimizer.py`: Core functionality
- Client methods with mocked HTTP
- Agent orchestration
- Metric extraction
- Spill detection
### Edge Case Tests (7 tests)
- `tests/test_edge_cases.py`: Error handling
- Empty stages
- Malformed LLM responses
- Network errors
- Missing data
- High GC overhead
### Golden Test Jobs (7 jobs)
- `examples/job_skew.py`: Data skew (90% keys to one partition)
- `examples/job_spill.py`: Memory spill (low memory config)
- `examples/job_cartesian.py`: Cartesian join (1M rows)
- `examples/job_broadcast_misuse.py`: Broadcast on large table
- `examples/job_small_files.py`: Small file explosion
- `examples/job_missing_predicate.py`: Missing predicate pushdown
- `examples/job_gc_pressure.py`: GC pressure from many objects
### Run All Tests
```bash
export GEMINI_API_KEY=your-key-here
python3 tests/test_optimizer.py
python3 tests/test_edge_cases.py
```
## Configuration
### Environment Variables
```bash
# Required
GEMINI_API_KEY=your-api-key
# Optional (with defaults)
SPARK_OPT_HISTORY_URL=http://localhost:18080
SPARK_OPT_TIMEOUT=30
SPARK_OPT_MODEL=gemini-2.0-flash-exp
SPARK_OPT_MAX_RETRIES=5
SPARK_OPT_RETRY_DELAY=5.0
SPARK_OPT_MAX_STAGES=5
SPARK_OPT_CODE_ANALYSIS=true
SPARK_OPT_LOG_LEVEL=INFO
```
See `.env.example` for full configuration options.
## Production Deployment
See `docs/DEPLOYMENT.md` for comprehensive deployment guide including:
- Resource requirements
- Rate limiting configuration
- Monitoring and health checks
- Scaling strategies
- Troubleshooting
## API Reference
### CLI
```bash
python3 spark_optimize.py \
--appId <application-id> \
--historyUrl <spark-history-url> \
[--jobCode <path-to-code>] \
[--output <output-file>] \
[--verbose]
```
### MCP Server
```bash
python3 -m src.server
```
Exposes tools:
- `get_application_summary(app_id)`
- `get_jobs(app_id)`
- `get_stages(app_id)`
- `get_stage_details(app_id, stage_id)`
- `get_executors(app_id)`
- `get_sql_executions(app_id)`
- `get_sql_plan(app_id, execution_id)`
- `get_rdd_storage(app_id)`
- `get_environment(app_id)`
- `get_event_timeline(app_id)`
### Python API
```python
from src.optimizer.engine import OptimizationEngine
from src.client import SparkHistoryClient
from src.llm_client import LLMClient
client = SparkHistoryClient("http://localhost:18080")
llm = LLMClient(api_key="your-key")
engine = OptimizationEngine(client, llm)
report = engine.analyze_application("application_123", code_path="job.py")
print(report.to_dict())
```
## Report Structure
```json
{
"app_id": "application_123",
"skew_analysis": [...],
"spill_analysis": [...],
"resource_analysis": [...],
"partitioning_analysis": [...],
"join_analysis": [...],
"recommendations": [
{
"category": "Configuration|Code",
"issue": "Description of the issue",
"suggestion": "Specific recommendation",
"evidence": "Supporting data",
"impact_level": "High|Medium|Low"
}
]
}
```
## Key Improvements from Initial Version
1. **Metric Extraction**: Added quantiles parameter to fetch task distribution
2. **Spill Detection**: Extract actual disk/memory spill from stage data
3. **Error Handling**: Comprehensive error handling throughout
4. **Logging**: Structured logging with configurable levels
5. **Configuration**: Centralized config with environment variable support
6. **Testing**: 18 tests covering unit, edge cases, and integration
7. **Documentation**: Deployment guide, API docs, troubleshooting
## Performance
- **Analysis Time**: ~30-60 seconds per application
- **Memory Usage**: ~500MB typical, ~2GB for large apps
- **API Calls**: 6 LLM calls per analysis (one per agent)
- **Rate Limiting**: Auto-retry with exponential backoff
## Limitations
1. **Skew Detection**: Requires quantile data from Spark History Server
2. **Code Analysis**: Limited to static analysis (no execution)
3. **API Dependency**: Requires Gemini API access
4. **Historical Data**: Only analyzes completed applications
## Future Enhancements
- [ ] Real-time analysis of running applications
- [ ] Cost optimization recommendations
- [ ] Regression risk detection
- [ ] Support for Iceberg/Delta-specific rules
- [ ] Web UI for interactive analysis
- [ ] Scheduled batch analysis
## License
[Your License Here]
## Contributors
[Your Team Here]
## Support
For issues and questions:
- GitHub Issues: [Your Repo]
- Documentation: `docs/`
- Examples: `examples/`