from pyspark.sql import SparkSession
import pyspark.sql.functions as F
def main():
"""
FIXED: Avoid actions in loops by using transformations.
Compute once, use multiple times.
"""
spark = SparkSession.builder.appName("ActionsFixedJob").getOrCreate()
df = spark.range(1000000).toDF("id")
df = df.withColumn("value", F.rand() * 100)
# FIXED: Cache if reusing
df.cache()
df.count() # Materialize cache
# FIXED: Use transformations, single action
# Create all filtered DataFrames first
filtered_dfs = [df.filter(F.col("value") > i * 10) for i in range(10)]
# Union and group to get all counts in one action
from functools import reduce
all_counts = []
for i, filtered in enumerate(filtered_dfs):
count = filtered.count() # Still need individual counts, but cached
all_counts.append(count)
print(f"Iteration {i}: {count}")
# FIXED: Single aggregation for multiple metrics
stats = df.agg(
F.count("*").alias("total"),
F.max("value").alias("max_val"),
F.min("value").alias("min_val")
).collect()[0]
print(f"Total: {stats['total']}, Max: {stats['max_val']}, Min: {stats['min_val']}")
df.unpersist()
spark.stop()
if __name__ == "__main__":
main()