import unittest
from unittest.mock import MagicMock, patch
import json
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.optimizer.engine import OptimizationEngine
from src.client import SparkHistoryClient
from src.llm_client import LLMClient
from src.models import StageMetric
class TestAdvancedScenarios(unittest.TestCase):
"""Test advanced performance scenarios."""
def setUp(self):
self.mock_spark = MagicMock(spec=SparkHistoryClient)
self.mock_llm = MagicMock(spec=LLMClient)
self.engine = OptimizationEngine(self.mock_spark, self.mock_llm)
def test_caching_without_unpersist(self):
"""Test detection of caching without unpersist (memory leak)."""
self.mock_spark.get_jobs.return_value = []
self.mock_spark.get_stages.return_value = []
self.mock_spark.get_executors.return_value = []
self.mock_spark.get_sql_metrics.return_value = []
self.mock_spark.get_rdd_storage.return_value = [
# Multiple cached RDDs indicating potential leak
MagicMock(id=1, name="rdd_1", memoryUsed=100000000, numCachedPartitions=10),
MagicMock(id=2, name="rdd_2", memoryUsed=100000000, numCachedPartitions=10),
MagicMock(id=3, name="rdd_3", memoryUsed=100000000, numCachedPartitions=10),
]
self.mock_spark.get_environment.return_value = {}
self.mock_spark.get_stage_details.return_value = {}
self.mock_llm.generate_recommendation.side_effect = [
'{"bottlenecks": [], "imbalance_detected": false, "overhead_analysis": "ok"}',
'{"spill_issues": [], "partitioning_issues": "ok"}',
'{"skewed_stages": [], "suggested_mitigations": []}',
'{"inefficient_joins": [], "missing_predicates": [], "aqe_opportunities": "none"}',
'{"recommendations": [{"config": "spark.executor.memory", "current": "1g", "suggested": "4g", "reason": "Multiple cached RDDs detected"}]}',
'{"code_issues": [{"line": "15", "issue": "Cache without unpersist", "suggestion": "Add unpersist() calls"}]}'
]
report = self.engine.analyze_application("app-cache-leak", job_code="df.cache()")
# Should detect caching issues
self.assertGreater(len(report.recommendations), 0)
def test_actions_in_loop(self):
"""Test detection of actions inside loops."""
code = """
for i in range(10):
filtered = df.filter(col("value") > i)
count = filtered.count() # Action in loop!
"""
self.mock_spark.get_jobs.return_value = []
self.mock_spark.get_stages.return_value = []
self.mock_spark.get_executors.return_value = []
self.mock_spark.get_sql_metrics.return_value = []
self.mock_spark.get_rdd_storage.return_value = []
self.mock_spark.get_environment.return_value = {}
self.mock_spark.get_stage_details.return_value = {}
self.mock_llm.generate_recommendation.side_effect = [
'{"bottlenecks": [], "imbalance_detected": false, "overhead_analysis": "ok"}',
'{"spill_issues": [], "partitioning_issues": "ok"}',
'{"skewed_stages": [], "suggested_mitigations": []}',
'{"inefficient_joins": [], "missing_predicates": [], "aqe_opportunities": "none"}',
'{"recommendations": []}',
'{"code_issues": [{"line": "3", "issue": "Action inside loop", "suggestion": "Cache DataFrame before loop and use transformations"}]}'
]
report = self.engine.analyze_application("app-loop", job_code=code)
# Should detect action in loop
code_issues = [r for r in report.recommendations if r.category == "Code"]
self.assertGreater(len(code_issues), 0)
def test_groupbykey_usage(self):
"""Test detection of groupByKey instead of reduceByKey."""
code = """
rdd = sc.parallelize(data)
grouped = rdd.groupByKey() # BAD: shuffles all data
sums = grouped.mapValues(sum)
"""
self.mock_spark.get_jobs.return_value = []
self.mock_spark.get_stages.return_value = [
StageMetric(
stageId=0, name="groupByKey", status="COMPLETE",
numTasks=100, numActiveTasks=0, numCompleteTasks=100, numFailedTasks=0,
executorRunTime=50000, executorCpuTime=40000000000,
jvmGcTime=100, resultSerializationTime=10,
inputBytes=1024000, outputBytes=512000,
shuffleReadBytes=10240000, # Large shuffle
shuffleWriteBytes=10240000,
diskBytesSpilled=0, memoryBytesSpilled=0,
peakExecutionMemory=1024000
)
]
self.mock_spark.get_executors.return_value = []
self.mock_spark.get_sql_metrics.return_value = []
self.mock_spark.get_rdd_storage.return_value = []
self.mock_spark.get_environment.return_value = {}
self.mock_spark.get_stage_details.return_value = {}
self.mock_llm.generate_recommendation.side_effect = [
'{"bottlenecks": [{"stageId": 0, "issue": "Large shuffle", "severity": "high"}], "imbalance_detected": false, "overhead_analysis": "ok"}',
'{"spill_issues": [], "partitioning_issues": "Large shuffle detected"}',
'{"skewed_stages": [], "suggested_mitigations": []}',
'{"inefficient_joins": [], "missing_predicates": [], "aqe_opportunities": "none"}',
'{"recommendations": []}',
'{"code_issues": [{"line": "2", "issue": "groupByKey usage", "suggestion": "Use reduceByKey or aggregateByKey instead"}]}'
]
report = self.engine.analyze_application("app-groupbykey", job_code=code)
# Should recommend reduceByKey
self.assertGreater(len(report.recommendations), 0)
def test_multiple_issues_combined(self):
"""Test detection of multiple issues in one job."""
code = """
df = spark.range(1000000)
df.cache() # Never unpersisted
for i in range(10):
df.filter(col("id") > i).count() # Action in loop
grouped = df.groupBy("id").count()
grouped.collect() # Collect large result
"""
self.mock_spark.get_jobs.return_value = []
self.mock_spark.get_stages.return_value = []
self.mock_spark.get_executors.return_value = []
self.mock_spark.get_sql_metrics.return_value = []
self.mock_spark.get_rdd_storage.return_value = [
MagicMock(id=1, name="cached_df", memoryUsed=100000000, numCachedPartitions=10)
]
self.mock_spark.get_environment.return_value = {}
self.mock_spark.get_stage_details.return_value = {}
self.mock_llm.generate_recommendation.side_effect = [
'{"bottlenecks": [], "imbalance_detected": false, "overhead_analysis": "ok"}',
'{"spill_issues": [], "partitioning_issues": "ok"}',
'{"skewed_stages": [], "suggested_mitigations": []}',
'{"inefficient_joins": [], "missing_predicates": [], "aqe_opportunities": "none"}',
'{"recommendations": []}',
'{"code_issues": [{"line": "2", "issue": "Cache without unpersist", "suggestion": "Add unpersist"}, {"line": "4", "issue": "Action in loop", "suggestion": "Avoid actions in loops"}, {"line": "6", "issue": "collect() on large data", "suggestion": "Use show() or write to storage"}]}'
]
report = self.engine.analyze_application("app-multiple", job_code=code)
# Should detect multiple issues
code_issues = [r for r in report.recommendations if r.category == "Code"]
self.assertGreaterEqual(len(code_issues), 3)
class TestRecommendationValidation(unittest.TestCase):
"""Test that recommendations are valid and actionable."""
def setUp(self):
self.mock_spark = MagicMock(spec=SparkHistoryClient)
self.mock_llm = MagicMock(spec=LLMClient)
self.engine = OptimizationEngine(self.mock_spark, self.mock_llm)
def test_recommendations_have_required_fields(self):
"""Test that all recommendations have required fields."""
self.mock_spark.get_jobs.return_value = []
self.mock_spark.get_stages.return_value = []
self.mock_spark.get_executors.return_value = []
self.mock_spark.get_sql_metrics.return_value = []
self.mock_spark.get_rdd_storage.return_value = []
self.mock_spark.get_environment.return_value = {}
self.mock_spark.get_stage_details.return_value = {}
self.mock_llm.generate_recommendation.side_effect = [
'{"bottlenecks": [], "imbalance_detected": false, "overhead_analysis": "ok"}',
'{"spill_issues": [], "partitioning_issues": "ok"}',
'{"skewed_stages": [], "suggested_mitigations": []}',
'{"inefficient_joins": [], "missing_predicates": [], "aqe_opportunities": "none"}',
'{"recommendations": [{"config": "spark.executor.memory", "current": "1g", "suggested": "4g", "reason": "Increase memory"}]}',
'{"code_issues": []}'
]
report = self.engine.analyze_application("app-test")
# Verify all recommendations have required fields
for rec in report.recommendations:
self.assertIsNotNone(rec.category)
self.assertIsNotNone(rec.issue)
self.assertIsNotNone(rec.suggestion)
self.assertIsNotNone(rec.evidence)
self.assertIsNotNone(rec.impact_level)
self.assertIn(rec.category, ["Configuration", "Code"])
self.assertIn(rec.impact_level, ["High", "Medium", "Low"])
if __name__ == '__main__':
os.environ['GEMINI_API_KEY'] = 'AIzaSyCU2RV4BpPL8HaYX7sIu5D3mSig6nKDvTE'
unittest.main(verbosity=2)