from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, explode, sequence, col
import random
def main():
# 1. Skew Detection & Inefficient Joins
spark = SparkSession.builder \
.appName("Ex_Skew_And_Join") \
.config("spark.sql.shuffle.partitions", "10") \
.getOrCreate()
print("Generating Skewed Data...")
# Create dataset where key '0' is 95% of data
df_skew = spark.range(0, 100000).select(
(col("id") % 1000 == 0).cast("int").alias("key"), # 0 or 1, but mostly 0? No wait.
# Let's make key 0 very frequent
(col("id") < 95000).cast("int").alias("skew_key"), # 0 for first 95k, 1 for rest? No.
lit("A" * 100).alias("payload")
).withColumn("key", lit(0)) # Actually let's just make everything 0 for massive skew in one partition
# Better skew generation:
# Key 0: 50,000 rows
# Keys 1-50,000: 1 row each
# Huge generic dataframe
df_large = spark.range(0, 100000).withColumn("key", lit(0))
df_small = spark.range(0, 1000).withColumn("key", lit(0))
print("Executing Inefficient Join (Skewed)...")
# This join will force all data to one reducer because both sides have key=0
# This should trigger "Skew Detection" and "Inefficient Join"
joined = df_large.join(df_small, "key")
joined.count()
spark.stop()
if __name__ == "__main__":
main()