from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any
@dataclass
class SparkApp:
id: str
name: str
attempts: List[Dict[str, Any]]
def to_dict(self):
return asdict(self)
@dataclass
class JobMetric:
jobId: int
name: str
submissionTime: Optional[str]
completionTime: Optional[str]
status: str
numSimpleStages: int # Count
stageIds: List[int]
def to_dict(self):
return asdict(self)
@dataclass
class StageMetric:
stageId: int
name: str
status: str
numTasks: int
numActiveTasks: int
numCompleteTasks: int
numFailedTasks: int
executorRunTime: int # Sum of executor run time
executorCpuTime: int # Sum of executor CPU time
jvmGcTime: int # Sum of JVM GC time
resultSerializationTime: int # Sum of result serialization time
inputBytes: int
outputBytes: int
shuffleReadBytes: int
shuffleWriteBytes: int
diskBytesSpilled: int
memoryBytesSpilled: int
peakExecutionMemory: int
def to_dict(self):
return asdict(self)
@dataclass
class SkewAnalysis:
is_skewed: bool
skew_ratio: float
max_duration: float
median_duration: float
stage_id: int
to_dict = asdict
@dataclass
class SpillAnalysis:
has_spill: bool
total_disk_spill: int
total_memory_spill: int
stage_id: int
to_dict = asdict
@dataclass
class ResourceAnalysis:
stage_id: int
executor_utilization: float # CpuTime / RunTime
gc_overhead: float # GcTime / RunTime
serialization_overhead: float # SerTime / RunTime
memory_pressure: bool # Based on peak memory or excessive spill
to_dict = asdict
@dataclass
class PartitioningAnalysis:
stage_id: int
avg_partition_size: float # ShuffleBytes / NumTasks
is_suboptimal: bool # Too small or too large
recommendation_type: str # "Coalesce", "IncreasePartitions", "none"
to_dict = asdict
@dataclass
class JoinAnalysis:
stage_id: int
is_inefficient: bool
join_type: str
reason: str
to_dict = asdict
@dataclass
class Recommendation:
category: str
issue: str
suggestion: str
evidence: str
impact_level: str
to_dict = asdict
@dataclass
class OptimizationReport:
app_id: str
skew_analysis: List[SkewAnalysis]
spill_analysis: List[SpillAnalysis]
resource_analysis: List[ResourceAnalysis]
partitioning_analysis: List[PartitioningAnalysis]
join_analysis: List[JoinAnalysis]
recommendations: List[Recommendation]
def to_dict(self):
return {
"app_id": self.app_id,
"skew_analysis": [asdict(s) for s in self.skew_analysis],
"spill_analysis": [asdict(s) for s in self.spill_analysis],
"resource_analysis": [asdict(s) for s in self.resource_analysis],
"partitioning_analysis": [asdict(s) for s in self.partitioning_analysis],
"join_analysis": [asdict(s) for s in self.join_analysis],
"recommendations": [asdict(r) for r in self.recommendations]
}
@dataclass
class ExecutorMetric:
id: str
hostPort: str
rddBlocks: int
memoryUsed: int
diskUsed: int
totalCores: int
maxTasks: int
activeTasks: int
failedTasks: int
completedTasks: int
totalDuration: int
totalGCTime: int
totalInputBytes: int
totalShuffleRead: int
totalShuffleWrite: int
def to_dict(self):
return asdict(self)
@dataclass
class SQLMetric:
id: int
description: str
submissionTime: str
completionTime: str
duration: int
jobIds: List[int]
status: str
def to_dict(self):
return asdict(self)
@dataclass
class RDDMetric:
id: int
name: str
numPartitions: int
numCachedPartitions: int
storageLevel: str
memoryUsed: int
diskUsed: int
def to_dict(self):
return asdict(self)