Pyspark Series: (Zero to Hero QnA)

Rahul Tiwari
3 min readJul 12, 2024

--

Day-2

Below are everyday scenarios which you will face while working with Pyspark.

Exercise 1: Counting Words in a Text File
Problem: Calculate the frequency of each word in a text file using PySpark.

Solution:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()

lines = spark.read.text("file.txt").rdd.map(lambda r: r[0])
words = lines.flatMap(lambda x: x.split(" "))
word_counts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
word_counts.collect()

Exercise 2: Filtering DataFrames
Problem: Filter a DataFrame to keep rows where a specific column meets a condition (e.g., column “age” > 30).

Solution:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)
filtered_df = df.filter(col("age") > 30)
filtered_df.show()

Exercise 3: Joining DataFrames
Problem: Perform an inner join between two DataFrames based on a common key.

Solution:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df1 = spark.read.csv("data.csv", header=True, inferSchema=True)
df2 = spark.read.csv("data_updated.csv", header=True, inferSchema=True)
joined_df = df1.join(df2, df1.key == df2.key, "inner")
joined_df.show()

Exercise 4: Grouping and Aggregating Data
Problem: Calculate the average salary per department from an employee DataFrame.

Solution:

from pyspark.sql.functions import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)

avg_salary_df = df.groupBy("department").agg(avg("salary").alias("avg_salary"))
avg_salary_df.show()

#Note: It will only show the Average-Salary department wise, if you need all
#other columns to be selected you need to re-join with original dataframe.

Exercise 5: Handling Missing Values
Problem: Replace null values in a DataFrame column with a default value.

Solution:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)

filled_df = df.fillna({"age": 0, "salary": 0})
filled_df.show()

Exercise 6: Sorting DataFrames
Problem: Sort a DataFrame by multiple columns, one ascending and another descending.

Solution:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)
sorted_df = df.orderBy(col("age").asc(), col("salary").desc())
sorted_df.show()

Exercise 7: Applying User-defined Functions (UDFs)
Problem: Apply a UDF to a DataFrame column to perform a custom transformation (Very Important but should not be preferred over built-in functions).

Solution:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def convert_case(s):
return s.upper()

convert_case_udf = udf(convert_case, StringType())
df = df.withColumn("upper_name", convert_case_udf(col("name")))
df.show()

Exercise 8: Working with Window Functions
Problem: Rank rows within each partition of a DataFrame based on a specific column.

Solution:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
ranked_df = df.withColumn("rank", rank().over(window_spec))
ranked_df.show()

Exercise 9: Pivot and Unpivot Operations
Problem: Pivot a DataFrame to convert rows into columns based on a categorical column.

Solution:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)
pivoted_df = df.groupBy("department").pivot("category").sum("value")
pivoted_df.show()

Exercise 10: Performing SQL Queries
Problem: Execute SQL queries on PySpark DataFrames.

Solution:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("employees")
result_df = spark.sql("SELECT * FROM employees WHERE age > 30")
result_df.show()

Exercise 11: Handling Large Datasets
Problem: Optimize the performance of PySpark jobs for large datasets.

Solution:
-> Use appropriate partitioning.
-> Cache intermediate DataFrames.
-> Avoid shuffling unnecessarily.
-> Utilize broadcast joins for smaller DataFrames.

Exercise 12: Combining DataFrames Union and UnionAll
Problem: Concatenate two DataFrames vertically (union operation).

Solution:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df1 = spark.read.csv("data.csv", header=True, inferSchema=True)
df2 = spark.read.csv("data_updated.csv", header=True, inferSchema=True)


union_df = df1.union(df2)
union_df.show()

Exercise 13: Using Broadcast Variables
Problem: Improve the efficiency of joins by broadcasting smaller DataFrames.

Solution:

from pyspark.sql.functions import broadcast

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df1 = spark.read.csv("data.csv", header=True, inferSchema=True)
df2 = spark.read.csv("data_updated.csv", header=True, inferSchema=True)
joined_df = df1.join(broadcast(df2), df1.key == df2.key, "inner")
joined_df.show()

Exercise 14: Handling Complex Data Types
Problem: Extract elements from an array column and create new columns.

Solution:

from pyspark.sql.functions import explode

spark = SparkSession.builder.appName("FilterDF").getOrCreate()

df = spark.read.csv("data.csv", header=True, inferSchema=True)
exploded_df = df.withColumn("exploded_values", explode(col("values")))
exploded_df.show()

Exercise 15: Writing to External Storage
Problem: Save a DataFrame to a CSV file.

Solution:

df.write.csv("/full_path_to_external_storage/output.csv", header=True)

Conclusion

Mastering these intermediate-level PySpark exercises will significantly enhance your data processing skills using Apache Spark. Practice these exercises to become proficient in handling large-scale data analytics tasks efficiently with PySpark. Do comment down for any explanations required.

Happy Learning !!

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Rahul Tiwari
Rahul Tiwari

No responses yet

Write a response