Skip to main content
Glama
ravipesala
by ravipesala

Agentic Spark Job Optimization System - Production Ready

Overview

A fully agentic system that uses LLM-powered agents to analyze Spark applications and provide configuration and code optimization recommendations without running calculations in code.

Features

  • MCP Server: Exposes Spark History Server data as MCP tools

  • Specialized Agents: 6 expert agents for different analysis domains

  • Production-Ready: Comprehensive error handling, logging, and configuration

  • Extensive Testing: 11+ unit tests, 7 edge case tests, integration tests

  • Golden Test Cases: 7 test jobs covering common performance issues

Quick Start

Installation

pip install -r requirements.txt cp .env.example .env # Edit .env with your GEMINI_API_KEY

Run Analysis

export GEMINI_API_KEY=your-key-here python3 spark_optimize.py \ --appId application_1234567890_0001 \ --historyUrl http://localhost:18080 \ --jobCode path/to/job.py \ --output reports/analysis.json

Architecture

Components

  1. Spark History MCP Server (src/server.py)

    • Exposes 10 MCP tools for Spark metrics

    • Stateless, read-only design

  2. Optimization Engine (src/optimizer/engine.py)

    • Orchestrates agent analysis

    • Gathers context from Spark History Server

    • Builds structured optimization reports

  3. Specialized Agents (src/optimizer/agents.py)

    • ExecutionAnalysisAgent: Long-running stages, executor imbalance

    • ShuffleSpillAgent: Shuffle efficiency, memory pressure

    • SkewDetectionAgent: Data skew detection

    • SQLPlanAgent: SQL plan inefficiencies

    • ConfigRecommendationAgent: Spark configuration tuning

    • CodeRecommendationAgent: Code-level optimizations

Data Flow

Spark History Server → SparkHistoryClient → OptimizationEngine → Agents → Report

Test Coverage

Unit Tests (11 tests)

  • tests/test_optimizer.py: Core functionality

    • Client methods with mocked HTTP

    • Agent orchestration

    • Metric extraction

    • Spill detection

Edge Case Tests (7 tests)

  • tests/test_edge_cases.py: Error handling

    • Empty stages

    • Malformed LLM responses

    • Network errors

    • Missing data

    • High GC overhead

Golden Test Jobs (7 jobs)

  • examples/job_skew.py: Data skew (90% keys to one partition)

  • examples/job_spill.py: Memory spill (low memory config)

  • examples/job_cartesian.py: Cartesian join (1M rows)

  • examples/job_broadcast_misuse.py: Broadcast on large table

  • examples/job_small_files.py: Small file explosion

  • examples/job_missing_predicate.py: Missing predicate pushdown

  • examples/job_gc_pressure.py: GC pressure from many objects

Run All Tests

export GEMINI_API_KEY=your-key-here python3 tests/test_optimizer.py python3 tests/test_edge_cases.py

Configuration

Environment Variables

# Required GEMINI_API_KEY=your-api-key # Optional (with defaults) SPARK_OPT_HISTORY_URL=http://localhost:18080 SPARK_OPT_TIMEOUT=30 SPARK_OPT_MODEL=gemini-2.0-flash-exp SPARK_OPT_MAX_RETRIES=5 SPARK_OPT_RETRY_DELAY=5.0 SPARK_OPT_MAX_STAGES=5 SPARK_OPT_CODE_ANALYSIS=true SPARK_OPT_LOG_LEVEL=INFO

See .env.example for full configuration options.

Production Deployment

See docs/DEPLOYMENT.md for comprehensive deployment guide including:

  • Resource requirements

  • Rate limiting configuration

  • Monitoring and health checks

  • Scaling strategies

  • Troubleshooting

API Reference

CLI

python3 spark_optimize.py \ --appId <application-id> \ --historyUrl <spark-history-url> \ [--jobCode <path-to-code>] \ [--output <output-file>] \ [--verbose]

MCP Server

python3 -m src.server

Exposes tools:

  • get_application_summary(app_id)

  • get_jobs(app_id)

  • get_stages(app_id)

  • get_stage_details(app_id, stage_id)

  • get_executors(app_id)

  • get_sql_executions(app_id)

  • get_sql_plan(app_id, execution_id)

  • get_rdd_storage(app_id)

  • get_environment(app_id)

  • get_event_timeline(app_id)

Python API

from src.optimizer.engine import OptimizationEngine from src.client import SparkHistoryClient from src.llm_client import LLMClient client = SparkHistoryClient("http://localhost:18080") llm = LLMClient(api_key="your-key") engine = OptimizationEngine(client, llm) report = engine.analyze_application("application_123", code_path="job.py") print(report.to_dict())

Report Structure

{ "app_id": "application_123", "skew_analysis": [...], "spill_analysis": [...], "resource_analysis": [...], "partitioning_analysis": [...], "join_analysis": [...], "recommendations": [ { "category": "Configuration|Code", "issue": "Description of the issue", "suggestion": "Specific recommendation", "evidence": "Supporting data", "impact_level": "High|Medium|Low" } ] }

Key Improvements from Initial Version

  1. Metric Extraction: Added quantiles parameter to fetch task distribution

  2. Spill Detection: Extract actual disk/memory spill from stage data

  3. Error Handling: Comprehensive error handling throughout

  4. Logging: Structured logging with configurable levels

  5. Configuration: Centralized config with environment variable support

  6. Testing: 18 tests covering unit, edge cases, and integration

  7. Documentation: Deployment guide, API docs, troubleshooting

Performance

  • Analysis Time: ~30-60 seconds per application

  • Memory Usage: ~500MB typical, ~2GB for large apps

  • API Calls: 6 LLM calls per analysis (one per agent)

  • Rate Limiting: Auto-retry with exponential backoff

Limitations

  1. Skew Detection: Requires quantile data from Spark History Server

  2. Code Analysis: Limited to static analysis (no execution)

  3. API Dependency: Requires Gemini API access

  4. Historical Data: Only analyzes completed applications

Future Enhancements

  • Real-time analysis of running applications

  • Cost optimization recommendations

  • Regression risk detection

  • Support for Iceberg/Delta-specific rules

  • Web UI for interactive analysis

  • Scheduled batch analysis

License

[Your License Here]

Contributors

[Your Team Here]

Support

For issues and questions:

  • GitHub Issues: [Your Repo]

  • Documentation: docs/

  • Examples: examples/

-
security - not tested
F
license - not found
-
quality - not tested

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ravipesala/spark_mcp_optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server