import json
import logging
from typing import Dict, Any, List, Optional
from ..llm_client import LLMClient
class BaseAgent:
def __init__(self, llm_client: LLMClient):
self.llm_client = llm_client
self.agent_name = "BaseAgent"
def analyze(self, context: Dict[str, Any]) -> Dict[str, Any]:
prompt = self._build_prompt(context)
response_text = self.llm_client.generate_recommendation(prompt)
return self._parse_response(response_text)
def _build_prompt(self, context: Dict[str, Any]) -> str:
raise NotImplementedError
def _parse_response(self, text: str) -> Dict[str, Any]:
""" Tries to parse JSON from the LLM response. """
try:
# Clean up potential markdown formatting
cleaned = text.strip()
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
if cleaned.startswith("```"):
cleaned = cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
return json.loads(cleaned)
except json.JSONDecodeError:
logging.warning(f"{self.agent_name} failed to produce valid JSON. Returning raw text.")
return {"raw_analysis": text, "error": "JSON parse error"}
class ExecutionAnalysisAgent(BaseAgent):
def __init__(self, llm_client: LLMClient):
super().__init__(llm_client)
self.agent_name = "ExecutionAnalysisAgent"
def _build_prompt(self, context: Dict[str, Any]) -> str:
return f"""
You are an Expert Spark Execution Analyst.
Analyze the following job execution metrics to identify long-running stages, executor imbalance, and excessive overhead.
Context:
- Jobs: {json.dumps(context.get('jobs', []), default=str)}
- Stages: {json.dumps(context.get('stages', []), default=str)}
- Executors: {json.dumps(context.get('executors', []), default=str)}
Output a JSON object with this structure:
{{
"bottlenecks": [
{{ "stageId": int, "issue": "...", "severity": "high|medium|low" }}
],
"imbalance_detected": bool,
"overhead_analysis": "..."
}}
"""
class ShuffleSpillAgent(BaseAgent):
def __init__(self, llm_client: LLMClient):
super().__init__(llm_client)
self.agent_name = "ShuffleSpillAgent"
def _build_prompt(self, context: Dict[str, Any]) -> str:
return f"""
You are an Expert Spark Shuffle & Spill Analyst.
Analyze the metrics for shuffle efficiency and memory pressure.
Context:
- Stage Details (with spill metrics): {json.dumps(context.get('stages', []), default=str)}
Output a JSON object:
{{
"spill_issues": [
{{ "stageId": int, "memorySpill": "...", "diskSpill": "...", "recommendation": "..." }}
],
"partitioning_issues": "..."
}}
"""
class SkewDetectionAgent(BaseAgent):
def __init__(self, llm_client: LLMClient):
super().__init__(llm_client)
self.agent_name = "SkewDetectionAgent"
def _build_prompt(self, context: Dict[str, Any]) -> str:
return f"""
You are an Expert Skew Detector for Spark.
Detect data skew based on task duration variance and partition sizes.
Context:
- Stage Metrics: {json.dumps(context.get('stages', []), default=str)}
- Task Distribution (quantiles): {json.dumps(context.get('task_distribution', {}), default=str)}
Analyze the task distribution metrics. If executorRunTime quantiles show high variance
(e.g., 95th percentile >> median), this indicates skew.
Calculate skew_ratio as: (95th percentile duration) / (median duration)
If skew_ratio > 2.0, mark as skewed.
Output a JSON object:
{{
"skewed_stages": [
{{
"stageId": int,
"skewType": "duration|size",
"details": "...",
"skew_ratio": float,
"max_duration": float,
"median_duration": float
}}
],
"suggested_mitigations": ["..."]
}}
"""
class SQLPlanAgent(BaseAgent):
def __init__(self, llm_client: LLMClient):
super().__init__(llm_client)
self.agent_name = "SQLPlanAgent"
def _build_prompt(self, context: Dict[str, Any]) -> str:
return f"""
You are an Expert Spark SQL Plan Analyst.
Review the SQL plans for inefficiencies like Cartesian joins, missing predicates, or bad join strategies.
Context:
- SQL Executions: {json.dumps(context.get('sql', []), default=str)}
- Logical/Physical Plans: {json.dumps(context.get('plans', {}), default=str)}
Output a JSON object:
{{
"inefficient_joins": [],
"missing_predicates": [],
"aqe_opportunities": "..."
}}
"""
class ConfigRecommendationAgent(BaseAgent):
def __init__(self, llm_client: LLMClient):
super().__init__(llm_client)
self.agent_name = "ConfigRecommendationAgent"
def _build_prompt(self, context: Dict[str, Any]) -> str:
return f"""
You are an Expert Spark Configuration Tuner.
Based on the analyzed bottlenecks and environment, suggest better configurations.
Context:
- Current Environment: {json.dumps(context.get('environment', {}), default=str)}
- Identified Bottlenecks: {json.dumps(context.get('bottlenecks', []), default=str)}
- Resource Usage: {json.dumps(context.get('resources', {}), default=str)}
Output a JSON object:
{{
"recommendations": [
{{ "config": "spark.conf.key", "current": "...", "suggested": "...", "reason": "..." }}
]
}}
"""
class CodeRecommendationAgent(BaseAgent):
def __init__(self, llm_client: LLMClient):
super().__init__(llm_client)
self.agent_name = "CodeRecommendationAgent"
def _build_prompt(self, context: Dict[str, Any]) -> str:
code_snippet = context.get('code', '')
if not code_snippet:
return "No code provided."
return f"""
You are an Expert Spark Code Reviewer.
Review the provided Spark job code for semantic inefficiencies (not style).
Look for: Loops containing actions, suboptimal API usage (e.g. GroupByKey), hardcoded partitions.
Code:
```
{code_snippet[:5000]} # Truncated for token limits if necessary
```
Output a JSON object:
{{
"code_issues": [
{{ "line": "...", "issue": "...", "suggestion": "..." }}
]
}}
"""