from pyspark.sql import SparkSession
import pyspark.sql.functions as F
def main():
"""
FIXED: Applying recommendations from skew job analysis.
Original issues:
- Hardcoded repartition(10)
- collect() on grouped data
- Manual data creation loop
"""
spark = SparkSession.builder.appName("SkewedJobFixed").getOrCreate()
# FIXED: Use Spark-native data creation
num_skewed = 100000
num_uniform = 10000
skewed_df = spark.range(num_skewed).withColumn('id', F.lit(1)).withColumn('val', F.lit('skewed'))
uniform_df = spark.range(num_uniform).withColumn('id', F.col('id') + num_skewed).withColumn('val', F.lit('uniform'))
df = skewed_df.union(uniform_df)
# FIXED: Dynamic partitioning based on cluster resources
df = df.repartition(spark.sparkContext.defaultParallelism)
# Enable AQE to handle skew automatically
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Force a shuffle with skew
grouped = df.groupBy("id").count()
# FIXED: Don't collect, write to storage or use show() for debugging
print(f"Total groups: {grouped.count()}")
grouped.show(10) # Only show sample
# Or write to storage
# grouped.write.mode("overwrite").parquet("/tmp/skew_output")
print("Skew job finished (fixed)")
spark.stop()
if __name__ == "__main__":
main()