#StackBounty: #apache-spark #pyspark #apache-spark-sql Join computed twice on pyspark, maybe I dont understand lazyness?

Bounty: 50

too long since I used spark last time, I’m on it again with Spark 3.1, and here is my issue:
I have 20M rows left joined 400M rows, the original code is:

times= [50000,20000,10000,1000]
for time in times:
    join = (df_a.join(df_b,
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-time))
                                 ], 'left')

knowing that each iteration (time variable) contains the next I thought in making the DataFrame lighter before comparing with each value, so coded this:

times= [50000,20000,10000,1000]

join = (df_a.join(df_b,
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left')

join.checkpoint() # Save current state, and cleaned dataframe

for time in times:
    step_join = join_df.where((join_df["task"]) = (join_df["task"]-time)))
    # Make calculations and Store result for the iteration...

When looking at the visual SQL diagram on the Spark history server, it seems that my improved solution (?) on the second join is not used, it makes the whole left join again on each iteration, not making use of the cleaner, lighter DataFrame.

My final idea was to use the new df for the next iteration so each filter will be lighter. Are my thoughts correct? am I missing something?

Image of how it looks like, this is a still-running code, the SortMergeJoin of the middle is the decoupled to filter, the second "Filter" only filters a little more, but on left and right you can see that it’s computing again the SortMergeJoin instead of reusing the previously calculated.
enter image description here

And this is how the processing looks like, same calculations each time plus filter
enter image description here

Last time have to remove the checkpoint because with 55B rows on a join it was hard to store the data (>100TB)

My cluster configuration for 30 instances 64vcore 488GB RAM + driver

        "spark.executor.instances", "249").config("spark.executor.memoryOverhead", "10240").config(
        "spark.executor.memory", "87g").config("spark.executor.cores", "12").config("spark.driver.cores", "12").config(
        "spark.default.parallelism", "5976").config("spark.sql.adaptive.enabled", "true").config(
        "spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "3100").config(
        "spark.yarn.driver.memoryOverhead", "10240").config("spark.sql.autoBroadcastJoinThreshold", "2100").config(
        "spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

I’m using the excel calculator on this site for tunning of everything except spark.sql.shuffle.partitions https://www.c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/ now using 10 executors per node

Tried using .cache() on the join it’s still slower than the 4 paralel joins, the first join es muy slower.
Note that .cache() is good for a subset but for a 100TB join result it would be slower because it will cache to disk.

Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql Parallelize window function or speed up window function in spark without changing the win…

Bounty: 50

I have a set of window functions in a spark query which, among other things, partitions on user_num. One of these user_nums has far more rows than the others. This row is getting computed in a single task, which has a much higher shuffle read, shuffle remote read and ultimately takes a huge amount of time.

Select LAG(e) OVER (PARTITION BY user_num, a, date ORDER BY time) as aa,
FROM table

Is there any setting or any way to have this run on different tasks or otherwise shrink this amount of time in a way that would require no or minimal changes to the logic of the window functions?

I.E Could I cache at a certain point, increase number of partitions, increase exec mem etc.

Get this bounty!!!

#StackBounty: #apache-spark #hive #apache-spark-sql #kerberos How to use Apache Spark to query Hive table with Kerberos?

Bounty: 50

I am attempting to use Scala with Apache Spark locally to query Hive table which is secured with Kerberos. I have no issues connecting and querying the data programmatically without Spark. However, the problem comes when I try to connect and query in Spark.

My code when run locally without spark:


    System.setProperty("kerberos.keytab", keytab)
    System.setProperty("kerberos.principal", keytab)
    System.setProperty("java.security.krb5.conf", krb5.conf)
    System.setProperty("java.security.auth.login.config", jaas.conf)

    val conf = new Configuration
    conf.set("hadoop.security.authentication", "Kerberos")

    UserGroupInformation.createProxyUser("user", UserGroupInformation.getLoginUser)
    UserGroupInformation.loginUserFromKeytab(user, keytab)

    if (UserGroupInformation.isLoginKeytabBased) {
    else if (UserGroupInformation.isLoginTicketBased) UserGroupInformation.getLoginUser.reloginFromTicketCache()

    val con = DriverManager.getConnection("jdbc:hive://hdpe-hive.company.com:10000", user, password)
    val ps = con.prepareStatement("select * from table limit 5").executeQuery();

Does anyone know how I could include the keytab, krb5.conf and jaas.conf into my Spark initialization function so that I am able to authenticate with Kerberos to get the TGT?

My Spark initialization function:

conf = new SparkConf().setAppName("mediumData")
      .set("spark.driver.host", "localhost")
      .set("spark.ui.enabled","true") //enable spark UI
    sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()

I do not have files such as hive-site.xml, core-site.xml.

Thank you!

Get this bounty!!!

#StackBounty: #java #apache-spark #apache-spark-sql Update value in struct type column in java spark

Bounty: 50

I want capability to update value in nested dataset. For this I have a created as nested Dataset in Spark. It has below schema structure:-


 |-- field_a: string (nullable = false)

 |-- field_b: struct (nullable = true)

 |    |-- field_d: struct(nullable = false)
          |-- field_not_to_update: string(nullable = true)

 |        |-- field_to_update: string(nullable = false)
 |   field_c: string (nullable = false)

Now I wanted to update value in field_to_update in the dataset. I have tried

aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val")

Also tried,

aFooData.foreach(new ClassWithForEachFunction());

where ClassWithForEachFunction implements ForEachFunction<Row> and has method public void call(Row aRow) to update field_to_update attribute. Tried same with lamda as well but it was throwing Task not serializable exception so has to go for long process.

None of them are fruitful so far and I am getting same Dataset with foreach and new column with name field_b.field_d.field_to_update in second case. Any other suggestions for same?

Get this bounty!!!

#StackBounty: #pandas #dataframe #pyspark #apache-spark-sql #pandas-groupby Make groupby.apply more efficient or convert to spark

Bounty: 200


I am using pandas groupby.apply to use my own custom function. However, I have noticed that the function is very, very slow. Can someone help me in converting this code to apply to spark dataframes?

Adding simple example for people to use:

import pandas as pd
import operator

df = pd.DataFrame({
    'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
    'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
    'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
def get_stats(data_frame):

    # For each grouped data_frame, cutoff all Sounds greater than 99th percentile
    cutoff_99 = data_frame[data_frame.Sounds <= data_frame.Sounds.quantile(.99)]

    # Based on total number of records, select the most-abundant sers
    sers_to_use = max((cutoff_99.Sers.value_counts() / cutoff_99.shape[0]).to_dict().items(), key = operator.itemgetter(1))[0]

    # Give me the average sound of the selected sers
    avg_sounds_of_sers_to_use = cutoff_99.loc[cutoff_99["Sers"] == sers_to_use].Sounds.mean()

    # Pre-allocate lists
    cool = []
    mean_sounds = []
    ratios = []
    _difference = []

    for i in cutoff_99.Sers.unique():
        # add each unique sers of that dataframe 

        # get the mean sound of that ser
        sers_mean_sounds = (cutoff_99.loc[cutoff_99["Sers"] == i].Sounds).mean()

        # add each mean sound for each sers

        # get the ratio of the sers to use vs. the current sers; add all of the ratios to the list
        ratios.append(avg_sounds_of_sers_to_use / sers_mean_sounds)

        # get the percent difference and add it to a list
                    abs(avg_sounds_of_sers_to_use - sers_mean_sounds)
                    / ((avg_sounds_of_sers_to_use + sers_mean_sounds) / 2),
                * 100

    # return a series with these lists/values.
    return pd.Series({
        'Cools': cool,
        'Chosen_Sers': sers_to_use,
        'Average_Sounds_99_Percent': mean_sounds,
        'Mean_Ratios': ratios,
        'Percent_Differences': _difference

I call the function as follows in pandas:

Get this bounty!!!

#StackBounty: #apache-spark #pyspark #apache-spark-sql #pyspark-dataframes How to generate incremental sub_id not unique using Pyspark

Bounty: 50

My goal is to create a random id and an incremental sub_id you will find more detailed explanation of my problem below.

initial dataframe

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']


enter image description here

To generate trip_id (Can be random) column I used:

df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest'))).sort('idx')

enter image description here

To generate subtrip_id for each trip_id, I used:

df_subtrip = df_trip.withColumn("subtrip_id", F.row_number().over(Window.partitionBy(['p_uuid', 'u_uuid', 'dest', 'mode']).orderBy('idx')))

enter image description here

Oops!! this isn’t what i’m looking for, the problem is that I couldn’t create a sub_id incremntale like.

What I m looking for:

enter image description here

Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql #apache-spark-dataset Spark SQL alternatives to groupby/pivot/agg/collect_list using fold…

Bounty: 50

I have a Spark DataFrame consisting of three columns:

 id | col1 | col2 
 x  |  p1  |  a1  
 x  |  p2  |  b1
 y  |  p2  |  b2
 y  |  p2  |  b3
 y  |  p3  |  c1

After applying df.groupBy("id").pivot("col1").agg(collect_list("col2")) I am getting the following dataframe (aggDF):

| id|  p1|      p2|  p3|
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|

Then I find the name of columns except the id column.

val cols = aggDF.columns.filter(x => x != "id")

After that I am using cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null)))) to replace empty array with null. The performance of this code becomes poor when the number of columns increases. Additionally, I have the name of string columns val stringColumns = Array("p1","p3"). I want to get the following final dataframe:

| id|  p1|      p2|  p3|
|  x| a1 |    [b1]|null|
|  y|null|[b2, b3]| c1 |

Is there any better solution to this problem in order to achieve the final dataframe?

Get this bounty!!!

#StackBounty: #postgresql #apache-spark #pyspark #apache-spark-sql #bigdata Writing more than 50 millions from Pyspark df to PostgresSQ…

Bounty: 300

What would be the most efficient way to insert millions of records say 50-million from a Spark dataframe to Postgres Tables.
I have done this from spark to
MSSQL in the past by making use of bulk copy and batch size option which was successful too.

Is there something similar that can be here for Postgres?

Adding the code I have tried and the time it took to run the process:

def inserter():
    start = timer()
    .option("url", "jdbc:postgresql://xyz.com:5435/abc_db") 
    .option("dbtable", "public.full_load").option("user", "root").option("password", "password").save()
    end = timer()

So I did the above approach for 10 million records and had 5 parallel connections as specified in numPartitions and also tried batch size of 200k.

The total time it took for the process was 0:14:05.760926 (fourteen minutes and five seconds).

Is there any other efficient approach which would reduce the time?

What would be the efficient or optimal batch size I can use ? Will increasing my batch size do the job quicker ? Or opening multiple connections i.e > 5 help me make the process quicker ?

On an average 14 mins for 10 million records is not bad, but looking for people out there who would have done this before to help answer this question.

Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql #bigdata #spark-streaming Best approach to check if Spark streaming jobs are hanging

Bounty: 400

I have Spark streaming application which basically get a trigger message from Kafka which kick starts the batch processing which could potentially take up to 2 hours.

There were incidents where some of the jobs were hanging indefinitely and didn’t got completed with in the usual time and currently there is no way we could figure out the jobs status without checking the Spark UI manually. I want to have a way where the currently running spark jobs are hanging or not. So basically if it’s hanging for more than 30 minutes I want to notify the users so they can take an action. What all options do I have?

I see I can use metrics from driver and executors. If I were to choose the most important one, it would be the last received batch records. When StreamingMetrics.streaming.lastReceivedBatch_records == 0 it probably means that Spark streaming job has been stopped or failed.

But in my scenario i will receive only 1 streaming trigger event and then it will kick start the processing which may take up to 2 hours so I won’t be able to rely on the records received.

Is there a better way? TIA

Get this bounty!!!

#StackBounty: #pyspark #apache-spark-sql How to remove duplicates from a spark data frame while retaining the latest?

Bounty: 50

I’m using spark to load json files from Amazon S3. I would like to remove duplicates based on two columns of the dataframe retaining the newest(I have timestamp column). What would be the best way to do it? Please note that the duplicates may be spread across partitions. Can I remove duplicates retaining the last record without shuffling? I’m dealing with 1 TB of data.

Get this bounty!!!