Optimise an Already Optimised Heavy Spark Job with Long Lineage.

Sujit J Fulse
5 min readJan 27, 2024

--

Upon receiving the initial requirement to write a Spark job , you inquired about the volume of data that the job would be processing. The response indicated that it would be processing Terabytes (Tbs) of data. 😳😳

This information prompted you to contemplate implementing various optimisations and writing highly efficient code for the Spark job. However, despite your efforts, the job failed during performance testing. As a result, you find yourself drained from exploring all the optimisation techniques you discovered through online research. Now, the question arises: what should be your next course of action? 😒 😏

What have you already covered ?

👍 Serialisation
👍 File Formats
👍 Broadcasting
👍 Enabling AQE
👍 Avoid UDF’s (User Defined Functions),
👍 Data Skewness
👍 Level of Parallelism (coalesce , repartition,Shuffle Partition)
👍 assigning resources to spark jobs ( #executors, #driver resources)

in addition to above, also u tried

👍Replace withColumn with Select
👍Replace join and aggregations with Window functions

Still, job is not finishing within given time or failing with exception ? Lets try something new in next section.

Lets explore below option

✌️ Breaking the Lineage ✌️

Long-Lineage Bottleneck in Apache Spark

Every time we make changes to a DataFrame, the query plan expands. As the query plan grows larger, the performance takes a significant hit, leading to bottlenecks.

For instance, let’s consider a scenario where I need to extract data from different sources and apply various transformations such as filtering, exploding, and joining. After that, I need to combine(union) a subset of data from these transformations, perform additional processing (such as removing certain rows based on specific criteria that require windowing functions), and then proceed with other processing stages. Finally, I save the final output to a destination HDFS path.

such spark job with long DAG, would take a considerable amount of time. However, by saving the intermediate dataframes to HDFS and utilising them for subsequent queries, we can significantly improve the job’s execution time.

It’s not recommended to string together numerous transformations in a lineage, particularly when you have to handle a massive amount of data with limited resources. Therefore, it’s better to split the lineage.

Solution :

a) Checkpoint: Checkpointing is an essential step in Spark where the execution plan is shortened by storing intermediate staged data in HDFS or local file system. regular checkpointing writes data in HDFS. This feature proves to be extremely beneficial for data algorithms that involve repetitive iterations. The saved checkpoint files can be utilised in future job execution steps.

val newDF = existingDataframe.checkpoint();

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.localCheckpoint.html

b) Local Checkpoint: local checkpointing writes data in executors storage and not on hdfs. Executor’s local file system is used.

val newDF = existingDataframe.localCheckpoint();

c) Writing data to HDFS : it will be same as checkpoint(a), but here we will be manually writing data in hdfs in our favourite file format and reading it again.

df.write.parquet("/tmp/output/mydata.parquet")
val parqDF = spark.read.parquet("/tmp/output/mydata.parquet")

d) Split the spark job : (Brahmastra — The last resort to utilise. )

You can split spark job vertically or horizontally. Instead of initiating a single large spark job that will consume a significant portion of cluster resources for a considerable amount of time, why not launch multiple smaller spark jobs instead?

Vertically splitting — it will be same as above three options. We will split the big spark job into two smaller spark jobs. Your code will be divided into two separate code files at a specific point. The first job will run and store partially processed data into HDFS, which will then serve as the input for the second job. The output of the second job will be final output. Rest assured, we are not compromising on the infrastructure cost. Now, let’s discuss the comparison between running a single job versus running multiple jobs (two smaller spark jobs).

i) Performance —If a single big spark job takes 4 hours to run, we will schedule two jobs to run either in sequence or in parallel. The first job will be responsible for creating intermediate staging data, while the second job will process this data to generate the final output. The ultimate goal is to divide the single big job into two (multiple) smaller jobs in order to truncate the DAG. In fact, if you handle data dependencies effectively, you can run these two jobs in parallel, with the first job continuously creating data for the second job. In case of failure, there is no point in restarting the job from starting again. with smaller jobs , you can relaunch only the failed one.

ii) Cluster Resources — If single big spark job is launched with 100 executors, then if your jobs runs one after another then still you will be utilising 100 executors. Therefore, you will not be consuming any additional resources.

Vertically split job. The job was split vertically, with action “J” being the final step. However, we ended up splitting it at action “f”.

Horizontal splitting — This concept is based on the database sharding technique. Instead of dividing the code vertically, we will execute the entire code on a specific portion of the data. It’s not the code that is divided, but rather the data. To split the data, we first need to analyze the dependencies and gather all the necessary data into one group. There are various techniques that can be used, such as range-based, hash-based, or custom methods.

Let’s take an example to better understand this concept. Imagine you have data stored in a file or table, categorized by different countries. You have a single spark job that is running and analyzing this data. Now, if there is no relationship between two countries and you have a total of 100 countries to process the data for, then we can launch 4 spark jobs, with each job processing 25 countries. We will use the same code file for all the jobs, but we will pass an additional command line argument to provide a hint on the specific data that each job needs to process. By doing this, we are not breaking the DAG, but rather reducing the data load on each job. Lets talk more on comparison single job vs multiple job instances processing part of data. (Data Splitting)

i) Performance : If a single large Spark job takes 4 hours to run, then ideally, multiple instances of Spark jobs processing parts of the same data will finish quickly since we are reducing the overall data size. Also, consider the scenario in which 100 executors are required to launch, but the cluster has limited resources and can only launch 50 executors. In this case, you will still be able to run your spark job.

ii) Cluster Resources — If a single large spark job is launched with 100 executors, then when multiple instances of the same job are launched with a subset of data, the cluster resources will still be shared equally. For example, you can launch 4 spark jobs with 25 executors each, or you can launch 2 spark job instances with 50 executors each.

references :

https://www.pdl.cmu.edu/PDL-FTP/Storage/CMU-PDL-18-101.pdf

--

--

Sujit J Fulse
Sujit J Fulse

Written by Sujit J Fulse

I am Lead Data Engineer. I have experience in building end to end data pipeline. please connect me https://www.linkedin.com/in/sujit-j-fulse

Responses (3)