from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import time
import pickle
def main():
# 3. GC Overhead, Serialization Overhead, Executor Utilization
spark = SparkSession.builder \
.appName("Ex_Overhead_Resource") \
.getOrCreate()
# Sleep UDF to kill Utilization (High Wall time, Low CPU time)
@udf(StringType())
def slow_udf(x):
time.sleep(0.01) # Sleep 10ms per record
return str(x)
# Complex object UDF to trigger Serialization/GC
# Creating many small objects and passing them around
@udf(StringType())
def complex_udf(x):
# Create garbage
garbage = [str(i) * 100 for i in range(100)]
return str(len(garbage))
df = spark.range(0, 10000) # 10k rows
print("Triggering Utilization Issue (Sleep)...")
# Triggers "Executor Utilization" check (Low CPU%)
df.withColumn("sleep", slow_udf(col("id"))).count()
print("Triggering GC/Serialization Issue...")
# Triggers "GC Overhead" (creating lots of lists) and "Serialization" (Python UDF overhead)
df.select(complex_udf(col("id"))).count()
spark.stop()
if __name__ == "__main__":
main()