import unittest
from unittest.mock import MagicMock, patch
import json
from src.optimizer.engine import OptimizationEngine
from src.client import SparkHistoryClient
from src.llm_client import LLMClient
class TestEndToEnd(unittest.TestCase):
def setUp(self):
self.mock_spark = MagicMock(spec=SparkHistoryClient)
self.mock_llm = MagicMock(spec=LLMClient)
self.engine = OptimizationEngine(self.mock_spark, self.mock_llm)
def test_analysis_flow(self):
# 1. Setup Mock Data
self.mock_spark.get_jobs.return_value = []
self.mock_spark.get_stages.return_value = []
self.mock_spark.get_executors.return_value = []
self.mock_spark.get_sql_metrics.return_value = []
self.mock_spark.get_rdd_storage.return_value = []
self.mock_spark.get_environment.return_value = {}
# Mock LLM Responses
# We need independent responses for each agent
self.mock_llm.generate_recommendation.side_effect = [
# ExecutionAnalysisAgent
'{"bottlenecks": [{"stageId": 1, "issue": "Long Stage", "severity": "high"}], "imbalance_detected": false, "overhead_analysis": "ok"}',
# ShuffleSpillAgent
'{"spill_issues": [], "partitioning_issues": "ok"}',
# SkewDetectionAgent
'{"skewed_stages": [], "suggested_mitigations": []}',
# SQLPlanAgent
'{"inefficient_joins": [], "missing_predicates": [], "aqe_opportunities": "none"}',
# ConfigRecommendationAgent
'{"recommendations": [{"config": "spark.sql.shuffle.partitions", "current": "200", "suggested": "auto", "reason": "Small data"}]}',
# CodeRecommendationAgent
'{"code_issues": []}'
]
# 2. Run Analysis
report = self.engine.analyze_application("app-123", job_code="val x = 1")
# 3. Verify
self.assertEqual(report.app_id, "app-123")
self.assertEqual(len(report.recommendations), 1)
self.assertEqual(report.recommendations[0].category, "Configuration")
self.assertEqual(report.recommendations[0].suggestion, "Set spark.sql.shuffle.partitions to auto")
print("\n✅ End-to-End Verification Passed!")
if __name__ == '__main__':
unittest.main()