Big Data: Spark Optimization Techniques -Part 2.
Let’s go through each use case with a real-time example and its corresponding solution of all which we discussed in Part-1 of this Series:
1. Inefficient Shuffling
Example: You have a Spark job that performs a join operation between two large tables, causing excessive shuffling and slow performance.
Solution:
-> Repartitioning: Below is the code for it. Increase number of partitions to distribute data better.
df_large = df_large.repartition(100)
df_joined = df_large.join(df_small, “join_key”)
-> Broadcast Joins: Using broadcast joins for joining large to small dataframes.
from pyspark.sql.functions import broadcast
df_joined = df_large.join(broadcast(df_small), “key”)
-> Skew Handling: Using more partitions for skewed data
skewed_df = df_large.filter(df_large.key == ‘skewed_key’)
non_skewed_df = df_large.filter(df_large.key != ‘skewed_key’)
processed_skewed = skewed_df.repartition(200)
result = processed_skewed.union(non_skewed_df).join(df_small, “key”)
2. Inadequate Memory Allocation
Example: A Spark job processing a large dataset fails due to an out-of-memory error.
Solution:
-> Executor Memory:
spark-submit — — executor-memory 4G — — driver-memory 4G my_spark_job.py
-> Off-Heap Memory:
spark-submit — — conf spark.memory.offHeap.enabled=true — — conf spark.memory.offHeap.size=4G my_spark_job.py
-> Garbage Collection Tuning: Below code execute from spark-shell or bash terminal.
spark-submit — — conf spark.executor.extraJavaOptions=”-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=4" my_spark_job.py
3. Suboptimal Data Storage Formats
Example: Reading and writing large CSV files is slow and takes up a lot of storage space.
Solution:
-> Parquet/ORC: Using parquet/orc file formats rather than CSVs.
df = spark.read.csv(“large_dataset.csv”)
df.write.parquet(“large_dataset.parquet”)
-> Compression: Using file compression techniques
df.write.parquet(“large_dataset.parquet”, compression=’snappy’)
4. Poorly Tuned Spark Configurations
Example: A Spark job runs with default configurations, resulting in underutilized resources and slow processing.
Solution:
-> Dynamic Allocation: Enabling dynamic allocation to true.
spark-submit — — conf spark.dynamicAllocation.enabled=true my_spark_job.py
-> Parallelism:
spark-submit — — conf spark.sql.shuffle.partitions=200 — — conf spark.default.parallelism=200 my_spark_job.py
-> Speculative Execution:
spark-submit — — conf spark.speculation=true my_spark_job.py
5. Suboptimal Data Serialization:
Example: Serialization overhead slows down Spark job performance.
Solution:
-> Kryo Serializer:
spark-submit — — conf spark.serializer=org.apache.spark.serializer.KryoSerializer my_spark_job.py
-> Custom Registrations:
from pyspark import SparkConf
conf = SparkConf().set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)
conf.set(“spark.kryo.registrator”, “MyKryoRegistrator”)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
6. Inefficient DataFrame/Dataset Operations:
Example: Frequent recomputation of a large dataset during transformations.
Solution:
-> Cache/Persist:
df = df.cache() # or df.persist()
df.count() # Materialize the cache
-> Avoid UDFs: Use built-in functions instead.
from pyspark.sql.functions import col
df = df.withColumn(“new_col”, col(“existing_col”) * 2)
-> Predicate Pushdown:
df = df.filter(col(“date”) >= “2023–01–01”) # Filter early
7. Large Joins and Aggregations
Example: A Spark job performs a join between two large tables, resulting in performance bottlenecks.
Solution:
-> Join Hints:
from pyspark.sql.functions import broadcast
df_joined = df_large.join(broadcast(df_small), “key”)
-> Skew Handling:
df_skewed = df_large.filter(df_large.key == “skewed_value”).repartition(200)
df_normal = df_large.filter(df_large.key != “skewed_value”)
df_combined = df_skewed.union(df_normal)
df_result = df_combined.join(df_small, “key”)
-> Map-Side Aggregation:
df_agg = df.groupBy(“key”).agg(sum(“value”))
8. Straggler Tasks
Example: Some tasks take significantly longer to complete due to data skew.
Solution:
-> Data Skew:
df_skewed = df.filter(df[“key”] == “skewed_value”).repartition(200)
df_non_skewed = df.filter(df[“key”] != “skewed_value”)
df_combined = df_skewed.union(df_non_skewed)
-> Speculative Execution:
spark-submit — — conf spark.speculation=true my_spark_job.py
9. Checkpointing and Lineage Issues
Example: A long-running job fails after hours due to lineage graph recomputation.
Solution:
-> Checkpointing:
spark.sparkContext.setCheckpointDir(“./checkpoint_dir”)
df = df.checkpoint()
-> Persist Intermediate Results:
df_intermediate = df_intermediate.persist()
10. Resource Contention
Example: Multiple Spark jobs running concurrently on the same cluster, competing for resources.
Solution:
-> YARN/Mesos/Kubernetes:
spark-submit — — master yarn — — deploy-mode cluster my_spark_job.py
-> Queue Configurations:
spark-submit — — master yarn — — queue priority my_spark_job.py
NOTE: Kindly note all the spark-submit commands needs to be executed from Spark-Shell or bash terminal from where you can execute the spark submit commands.
Conclusion:
These real-time examples and solutions illustrate how to tackle common Spark performance tuning scenarios effectively. Kindly comments if you have any suggestions/info on this, will be happy to update it. Happy Learning!!