#StackBounty: #apache-spark #pyspark #apache-spark-sql Join computed twice on pyspark, maybe I dont understand lazyness?

Bounty: 50

too long since I used spark last time, I’m on it again with Spark 3.1, and here is my issue:
I have 20M rows left joined 400M rows, the original code is:

times= [50000,20000,10000,1000]
for time in times:
    join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-time))
                                 ], 'left')

knowing that each iteration (time variable) contains the next I thought in making the DataFrame lighter before comparing with each value, so coded this:

times= [50000,20000,10000,1000]

join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left')

join.checkpoint() # Save current state, and cleaned dataframe

for time in times:
    step_join = join_df.where((join_df["task"]) = (join_df["task"]-time)))
    # Make calculations and Store result for the iteration...

When looking at the visual SQL diagram on the Spark history server, it seems that my improved solution (?) on the second join is not used, it makes the whole left join again on each iteration, not making use of the cleaner, lighter DataFrame.

My final idea was to use the new df for the next iteration so each filter will be lighter. Are my thoughts correct? am I missing something?

Image of how it looks like, this is a still-running code, the SortMergeJoin of the middle is the decoupled to filter, the second "Filter" only filters a little more, but on left and right you can see that it’s computing again the SortMergeJoin instead of reusing the previously calculated.
enter image description here

And this is how the processing looks like, same calculations each time plus filter
enter image description here

Last time have to remove the checkpoint because with 55B rows on a join it was hard to store the data (>100TB)

My cluster configuration for 30 instances 64vcore 488GB RAM + driver

        "spark.executor.instances", "249").config("spark.executor.memoryOverhead", "10240").config(
        "spark.executor.memory", "87g").config("spark.executor.cores", "12").config("spark.driver.cores", "12").config(
        "spark.default.parallelism", "5976").config("spark.sql.adaptive.enabled", "true").config(
        "spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "3100").config(
        "spark.yarn.driver.memoryOverhead", "10240").config("spark.sql.autoBroadcastJoinThreshold", "2100").config(
        "spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

I’m using the excel calculator on this site for tunning of everything except spark.sql.shuffle.partitions https://www.c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/ now using 10 executors per node

Tried using .cache() on the join it’s still slower than the 4 paralel joins, the first join es muy slower.
Note that .cache() is good for a subset but for a 100TB join result it would be slower because it will cache to disk.
thanks!


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.