Skip to main content
Glama
ravipesala

Spark History MCP Server

by ravipesala

Agentic Spark Job Optimization System - Production Ready

Overview

A fully agentic system that uses LLM-powered agents to analyze Spark applications and provide configuration and code optimization recommendations without running calculations in code.

Features

  • MCP Server: Exposes Spark History Server data as MCP tools

  • Specialized Agents: 6 expert agents for different analysis domains

  • Production-Ready: Comprehensive error handling, logging, and configuration

  • Extensive Testing: 11+ unit tests, 7 edge case tests, integration tests

  • Golden Test Cases: 7 test jobs covering common performance issues

Quick Start

Installation

pip install -r requirements.txt
cp .env.example .env
# Edit .env with your GEMINI_API_KEY

Run Analysis

export GEMINI_API_KEY=your-key-here
python3 spark_optimize.py \
  --appId application_1234567890_0001 \
  --historyUrl http://localhost:18080 \
  --jobCode path/to/job.py \
  --output reports/analysis.json

Architecture

Components

  1. Spark History MCP Server (src/server.py)

    • Exposes 10 MCP tools for Spark metrics

    • Stateless, read-only design

  2. Optimization Engine (src/optimizer/engine.py)

    • Orchestrates agent analysis

    • Gathers context from Spark History Server

    • Builds structured optimization reports

  3. Specialized Agents (src/optimizer/agents.py)

    • ExecutionAnalysisAgent: Long-running stages, executor imbalance

    • ShuffleSpillAgent: Shuffle efficiency, memory pressure

    • SkewDetectionAgent: Data skew detection

    • SQLPlanAgent: SQL plan inefficiencies

    • ConfigRecommendationAgent: Spark configuration tuning

    • CodeRecommendationAgent: Code-level optimizations

Data Flow

Spark History Server → SparkHistoryClient → OptimizationEngine → Agents → Report

Test Coverage

Unit Tests (11 tests)

  • tests/test_optimizer.py: Core functionality

    • Client methods with mocked HTTP

    • Agent orchestration

    • Metric extraction

    • Spill detection

Edge Case Tests (7 tests)

  • tests/test_edge_cases.py: Error handling

    • Empty stages

    • Malformed LLM responses

    • Network errors

    • Missing data

    • High GC overhead

Golden Test Jobs (7 jobs)

  • examples/job_skew.py: Data skew (90% keys to one partition)

  • examples/job_spill.py: Memory spill (low memory config)

  • examples/job_cartesian.py: Cartesian join (1M rows)

  • examples/job_broadcast_misuse.py: Broadcast on large table

  • examples/job_small_files.py: Small file explosion

  • examples/job_missing_predicate.py: Missing predicate pushdown

  • examples/job_gc_pressure.py: GC pressure from many objects

Run All Tests

export GEMINI_API_KEY=your-key-here
python3 tests/test_optimizer.py
python3 tests/test_edge_cases.py

Configuration

Environment Variables

# Required
GEMINI_API_KEY=your-api-key

# Optional (with defaults)
SPARK_OPT_HISTORY_URL=http://localhost:18080
SPARK_OPT_TIMEOUT=30
SPARK_OPT_MODEL=gemini-2.0-flash-exp
SPARK_OPT_MAX_RETRIES=5
SPARK_OPT_RETRY_DELAY=5.0
SPARK_OPT_MAX_STAGES=5
SPARK_OPT_CODE_ANALYSIS=true
SPARK_OPT_LOG_LEVEL=INFO

See .env.example for full configuration options.

Production Deployment

See docs/DEPLOYMENT.md for comprehensive deployment guide including:

  • Resource requirements

  • Rate limiting configuration

  • Monitoring and health checks

  • Scaling strategies

  • Troubleshooting

API Reference

CLI

python3 spark_optimize.py \
  --appId <application-id> \
  --historyUrl <spark-history-url> \
  [--jobCode <path-to-code>] \
  [--output <output-file>] \
  [--verbose]

MCP Server

python3 -m src.server

Exposes tools:

  • get_application_summary(app_id)

  • get_jobs(app_id)

  • get_stages(app_id)

  • get_stage_details(app_id, stage_id)

  • get_executors(app_id)

  • get_sql_executions(app_id)

  • get_sql_plan(app_id, execution_id)

  • get_rdd_storage(app_id)

  • get_environment(app_id)

  • get_event_timeline(app_id)

Python API

from src.optimizer.engine import OptimizationEngine
from src.client import SparkHistoryClient
from src.llm_client import LLMClient

client = SparkHistoryClient("http://localhost:18080")
llm = LLMClient(api_key="your-key")
engine = OptimizationEngine(client, llm)

report = engine.analyze_application("application_123", code_path="job.py")
print(report.to_dict())

Report Structure

{
  "app_id": "application_123",
  "skew_analysis": [...],
  "spill_analysis": [...],
  "resource_analysis": [...],
  "partitioning_analysis": [...],
  "join_analysis": [...],
  "recommendations": [
    {
      "category": "Configuration|Code",
      "issue": "Description of the issue",
      "suggestion": "Specific recommendation",
      "evidence": "Supporting data",
      "impact_level": "High|Medium|Low"
    }
  ]
}

Key Improvements from Initial Version

  1. Metric Extraction: Added quantiles parameter to fetch task distribution

  2. Spill Detection: Extract actual disk/memory spill from stage data

  3. Error Handling: Comprehensive error handling throughout

  4. Logging: Structured logging with configurable levels

  5. Configuration: Centralized config with environment variable support

  6. Testing: 18 tests covering unit, edge cases, and integration

  7. Documentation: Deployment guide, API docs, troubleshooting

Performance

  • Analysis Time: ~30-60 seconds per application

  • Memory Usage: ~500MB typical, ~2GB for large apps

  • API Calls: 6 LLM calls per analysis (one per agent)

  • Rate Limiting: Auto-retry with exponential backoff

Limitations

  1. Skew Detection: Requires quantile data from Spark History Server

  2. Code Analysis: Limited to static analysis (no execution)

  3. API Dependency: Requires Gemini API access

  4. Historical Data: Only analyzes completed applications

Future Enhancements

  • Real-time analysis of running applications

  • Cost optimization recommendations

  • Regression risk detection

  • Support for Iceberg/Delta-specific rules

  • Web UI for interactive analysis

  • Scheduled batch analysis

License

[Your License Here]

Contributors

[Your Team Here]

Support

For issues and questions:

  • GitHub Issues: [Your Repo]

  • Documentation: docs/

  • Examples: examples/

-
security - not tested
F
license - not found
-
quality - not tested

Resources

Unclaimed servers have limited discoverability.

Looking for Admin?

If you are the server author, to access and configure the admin panel.

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ravipesala/spark_mcp_optimizer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server