Pyspark Series: (Zero to Hero QnA)
Day-2

Below are everyday scenarios which you will face while working with Pyspark.
Exercise 1: Counting Words in a Text File
Problem: Calculate the frequency of each word in a text file using PySpark.
Solution:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WordCount").getOrCreate()
lines = spark.read.text("file.txt").rdd.map(lambda r: r[0])
words = lines.flatMap(lambda x: x.split(" "))
word_counts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
word_counts.collect()
Exercise 2: Filtering DataFrames
Problem: Filter a DataFrame to keep rows where a specific column meets a condition (e.g., column “age” > 30).
Solution:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
filtered_df = df.filter(col("age") > 30)
filtered_df.show()
Exercise 3: Joining DataFrames
Problem: Perform an inner join between two DataFrames based on a common key.
Solution:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df1 = spark.read.csv("data.csv", header=True, inferSchema=True)
df2 = spark.read.csv("data_updated.csv", header=True, inferSchema=True)
joined_df = df1.join(df2, df1.key == df2.key, "inner")
joined_df.show()
Exercise 4: Grouping and Aggregating Data
Problem: Calculate the average salary per department from an employee DataFrame.
Solution:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
avg_salary_df = df.groupBy("department").agg(avg("salary").alias("avg_salary"))
avg_salary_df.show()
#Note: It will only show the Average-Salary department wise, if you need all
#other columns to be selected you need to re-join with original dataframe.
Exercise 5: Handling Missing Values
Problem: Replace null values in a DataFrame column with a default value.
Solution:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
filled_df = df.fillna({"age": 0, "salary": 0})
filled_df.show()
Exercise 6: Sorting DataFrames
Problem: Sort a DataFrame by multiple columns, one ascending and another descending.
Solution:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
sorted_df = df.orderBy(col("age").asc(), col("salary").desc())
sorted_df.show()
Exercise 7: Applying User-defined Functions (UDFs)
Problem: Apply a UDF to a DataFrame column to perform a custom transformation (Very Important but should not be preferred over built-in functions).
Solution:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def convert_case(s):
return s.upper()
convert_case_udf = udf(convert_case, StringType())
df = df.withColumn("upper_name", convert_case_udf(col("name")))
df.show()
Exercise 8: Working with Window Functions
Problem: Rank rows within each partition of a DataFrame based on a specific column.
Solution:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
ranked_df = df.withColumn("rank", rank().over(window_spec))
ranked_df.show()
Exercise 9: Pivot and Unpivot Operations
Problem: Pivot a DataFrame to convert rows into columns based on a categorical column.
Solution:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
pivoted_df = df.groupBy("department").pivot("category").sum("value")
pivoted_df.show()
Exercise 10: Performing SQL Queries
Problem: Execute SQL queries on PySpark DataFrames.
Solution:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("employees")
result_df = spark.sql("SELECT * FROM employees WHERE age > 30")
result_df.show()
Exercise 11: Handling Large Datasets
Problem: Optimize the performance of PySpark jobs for large datasets.
Solution:
-> Use appropriate partitioning.
-> Cache intermediate DataFrames.
-> Avoid shuffling unnecessarily.
-> Utilize broadcast joins for smaller DataFrames.
Exercise 12: Combining DataFrames Union and UnionAll
Problem: Concatenate two DataFrames vertically (union operation).
Solution:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df1 = spark.read.csv("data.csv", header=True, inferSchema=True)
df2 = spark.read.csv("data_updated.csv", header=True, inferSchema=True)
union_df = df1.union(df2)
union_df.show()
Exercise 13: Using Broadcast Variables
Problem: Improve the efficiency of joins by broadcasting smaller DataFrames.
Solution:
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df1 = spark.read.csv("data.csv", header=True, inferSchema=True)
df2 = spark.read.csv("data_updated.csv", header=True, inferSchema=True)
joined_df = df1.join(broadcast(df2), df1.key == df2.key, "inner")
joined_df.show()
Exercise 14: Handling Complex Data Types
Problem: Extract elements from an array column and create new columns.
Solution:
from pyspark.sql.functions import explode
spark = SparkSession.builder.appName("FilterDF").getOrCreate()
df = spark.read.csv("data.csv", header=True, inferSchema=True)
exploded_df = df.withColumn("exploded_values", explode(col("values")))
exploded_df.show()
Exercise 15: Writing to External Storage
Problem: Save a DataFrame to a CSV file.
Solution:
df.write.csv("/full_path_to_external_storage/output.csv", header=True)
Conclusion
Mastering these intermediate-level PySpark exercises will significantly enhance your data processing skills using Apache Spark. Practice these exercises to become proficient in handling large-scale data analytics tasks efficiently with PySpark. Do comment down for any explanations required.
Happy Learning !!