#StackBounty: #apache-spark #pyspark #pyarrow #apache-arrow Is it possible to generate large DataFrame in a distributed way in pyspark …

Bounty: 50

The problem boils down to the following: I want to generate a DataFrame in pyspark using existing parallelized collection of inputs and a function which given one input can generate a relatively large batch of rows. In the example below I want to generate 10^12 rows dataframe using e.g. 1000 executors:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(I don’t really want to study distribution of random numbers given seed – this is just an example I was able to come up with to illustrate the situation when large dataframe is not loaded from warehouse, but generated by the code)

The code above does pretty much exactly what I want. Problem is that it does it in a very inefficient way – at expense of creating a python Row object for each row, then converting the python Row objects into internal Spark columnar representation.

Is there a way I can convert batch of rows already in columnar representation (e.g. one or a few numpy arrays as above np_array) just by letting spark know that these are the columns of a batch of values?

E.g. I can write the code to generate python collection RDD where each element is an pyarrow.RecordBatch or a pandas.DataFrame, but I can’t find a way to convert any of these into Spark DataFrame without creating an RDD of pyspark Row objects in the process.

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

UPD.
I’ve found one way to avoid creation of Row objects – using RDD of python tuples. As expected it is still way too slow, but still a bit faster than using Row objects. Still, this is not really what I’m looking for (which is a really efficient way of passing columnar data to Spark from python).

Also measured time to do certain operations on a machine (crude way with quite a bit of variation in measured time, but still it representative in my opinion):
The dataset in question is 10M rows, 3 cols (one column is constant integer, other is integer range from 0 to 10M-1, third is floating point value generated using np.random.random_sample:

  • Locally generate pandas dataframe (10M rows): ~440-450ms
  • Locally generate python list of spark.sql.Row objects (10M rows): ~12-15s
  • Locally generate python list of tuples representing rows (10M rows): ~3.4-3.5s

Generate Spark dataframe using just 1 executor and 1 initial seed value:

  • using spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s
  • using spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s
  • (non-distributed creation) using spark.createDataFrame(pandas_df, schema=my_schema): ~0.4-0.5s (without pandas df generation itself which takes roughly same time) – with spark.sql.execution.arrow.enabled set to true.

The example with local-to-driver pandas dataframe converted to Spark dataframe in ~1s for 10M rows gives me a reason to believe same should be possible with dataframes generated in executors. However fastest I can achieve now is ~40s for 10M rows using RDD of python tuples.

So the question still stays – is there a way to generate a large Spark dataframe in a distributed way efficiently in pyspark?


Get this bounty!!!

#StackBounty: #pandas #apache-spark #pyspark #windowing Clean way to identify runs on a PySpark DF ArrayType column

Bounty: 100

Given a PySpark DataFrame of the form:

+----+--------+
|time|messages|
+----+--------+
| t01|    [m1]|
| t03|[m1, m2]|
| t04|    [m2]|
| t06|    [m3]|
| t07|[m3, m1]|
| t08|    [m1]|
| t11|    [m2]|
| t13|[m2, m4]|
| t15|    [m2]|
| t20|    [m4]|
| t21|      []|
| t22|[m1, m4]|
+----+--------+

I’d like to refactor it to compress runs containing the same message (the order of the output doesn’t matter much, but sorted her for clarity):

+----------+--------+-------+
|start_time|end_time|message|
+----------+--------+-------+
|       t01|     t03|     m1|
|       t07|     t08|     m1|
|       t22|     t22|     m1|
|       t03|     t04|     m2|
|       t11|     t15|     m2|
|       t06|     t07|     m3|
|       t13|     t13|     m4|
|       t20|     t20|     m4|
|       t22|     t22|     m4|
+----------+--------+-------+

(i.e. treat the message column as a sequence and identify the start and end of “runs” for each message),

Is there a clean way to make this transformation in Spark? Currently, I’m dumping this as a 6 GB TSV and processing it imperatively.

I’m open to the possibility of toPandas-ing this and accumulating on the driver if Pandas has a clean way to do this aggregation.


Get this bounty!!!

#StackBounty: #java #apache-spark #apache-spark-sql Update value in struct type column in java spark

Bounty: 50

I want capability to update value in nested dataset. For this I have a created as nested Dataset in Spark. It has below schema structure:-

root

 |-- field_a: string (nullable = false)

 |-- field_b: struct (nullable = true)

 |    |-- field_d: struct(nullable = false)
          |-- field_not_to_update: string(nullable = true)

 |        |-- field_to_update: string(nullable = false)
 |   field_c: string (nullable = false)

Now I wanted to update value in field_to_update in the dataset. I have tried

aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val")

Also tried,

aFooData.foreach(new ClassWithForEachFunction());

where ClassWithForEachFunction implements ForEachFunction<Row> and has method public void call(Row aRow) to update field_to_update attribute. Tried same with lamda as well but it was throwing Task not serializable exception so has to go for long process.

None of them are fruitful so far and I am getting same Dataset with foreach and new column with name field_b.field_d.field_to_update in second case. Any other suggestions for same?


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #count #rdd #reduce pyspark rdd taking the max frequency with the least age

Bounty: 50

I have an rdd like the following:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

I am trying to reduce each id to just 1 record by taking the highest frequency code using code like:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))
.reduceByKey(lambda x, y: x + y)
.map(lambda x: [i[1] for i in x[1]])
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

There is one problem with this implementation, it doesn’t consider age, so if for example one id had multiple codes with a frequency of 2, it would take the last code.

To illustrate this issue, please consider this reduced id:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

my code would output:

[(2, u'"V81.2"')]

when I would like for it to output:

[(2, u'"388.400000"')]

because although the frequency is the same for both of these codes, code 388.400000 has a lesser age and appears first.

by adding this line after the .reduceByKey():

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

I’m able to filter out those with greater than min age, but then I’m only considering those with min age and not all codes to calculate their frequency. I can’t apply the same/ similar logic after [max(zip((x.count(i) for i in set(x)), set(x)))] as the set(x) is the set of x[1], which doesn’t consider the age.

I should add, I don’t want to just take the first code with the highest frequency, I’d like to take the highest frequency code with the least age, or the code that appears first, if this is possible, using only rdd actions.

equivalent code in SQL would be something like:

SELECT id, code, MIN(age) AS age, count(*) AS cnt,
             ROW_NUMBER() OVER (PARTITION BY id order by count(*) DESC) AS seqnum
      FROM tbl
      GROUP BY id, code
     ) da
WHERE seqnum = 1

I’d really appreciate any help with this.


Get this bounty!!!

#StackBounty: #scala #apache-spark #apache-kafka #spark-structured-streaming Spark Streaming – Join on multiple kafka stream operation …

Bounty: 50

I have 3 kafka streams having 600k+ records each, spark streaming takes more than 10 mins to process simple joins between streams.

Spark Cluster config:

Spark Master UI

This is how i’m reading kafka streams to tempviews in spark(scala)

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

I join 3 TABLES using spark spark sql

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

Execution of Job:

Job UI

Am i missing out some configuration on spark that i’ve to look into?


Get this bounty!!!

#StackBounty: #apache-spark #group-by #pyspark #k-means Pyspark: applying kmeans on different groups of a dataframe

Bounty: 50

Using Pyspark I would like to apply kmeans separately on groups of a dataframe and not to the whole dataframe at once. For the moment I use a for loop which iterates on each group, applies kmeans and appends the result to another table. But having a lot of groups makes it time consuming. Anyone could help me please??
Thanks a lot!

for customer in customer_list:
    temp_df = togroup.filter(col("customer_id")==customer)
    df = assembler.transform(temp_df)
    k = 1
    while (k < 5 & mtrc < width):
        k += 1
        kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
        model = kmeans.fit(df)
        mtric = 1 - model.computeCost(df)/ttvar
        a = model.transform(df)select(cols)
        allcustomers = allcustomers .union(a)


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #apache-spark-sql #pyspark-dataframes How to generate incremental sub_id not unique using Pyspark

Bounty: 50

My goal is to create a random id and an incremental sub_id you will find more detailed explanation of my problem below.

initial dataframe

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

df.show()

enter image description here

To generate trip_id (Can be random) column I used:

df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest'))).sort('idx')

enter image description here

To generate subtrip_id for each trip_id, I used:

df_subtrip = df_trip.withColumn("subtrip_id", F.row_number().over(Window.partitionBy(['p_uuid', 'u_uuid', 'dest', 'mode']).orderBy('idx')))

enter image description here

Oops!! this isn’t what i’m looking for, the problem is that I couldn’t create a sub_id incremntale like.

What I m looking for:

enter image description here


Get this bounty!!!

#StackBounty: #r #apache-spark How to make plots from distributed data from R

Bounty: 50

I’m working with spark using R API, and have a grasp on how data is processed from spark, either when only spark native functions are used in which cases it is transparent for the user or in cases where spark_apply() is used, where it is required to have a better understanding on how the partitions are handled.

My doubt is regarding to plots where no aggregation is done, for example, is my understanding that if a group by is used before a plot not all the data will be used. But if I need to make say a scatter plot with 100 million dots, where is that data stored at this point? is it still distributed between all nodes? or is it at one node only, if the later… with the cluster get frozen because of this?


Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql #apache-spark-dataset Spark SQL alternatives to groupby/pivot/agg/collect_list using fold…

Bounty: 50

I have a Spark DataFrame consisting of three columns:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1

After applying df.groupBy("id").pivot("col1").agg(collect_list("col2")) I am getting the following dataframe (aggDF):

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+

Then I find the name of columns except the id column.

val cols = aggDF.columns.filter(x => x != "id")

After that I am using cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null)))) to replace empty array with null. The performance of this code becomes poor when the number of columns increases. Additionally, I have the name of string columns val stringColumns = Array("p1","p3"). I want to get the following final dataframe:

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x| a1 |    [b1]|null|
|  y|null|[b2, b3]| c1 |
+---+----+--------+----+

Is there any better solution to this problem in order to achieve the final dataframe?


Get this bounty!!!

#StackBounty: #postgresql #apache-spark #pyspark #apache-spark-sql #bigdata Writing more than 50 millions from Pyspark df to PostgresSQ…

Bounty: 300

What would be the most efficient way to insert millions of records say 50-million from a Spark dataframe to Postgres Tables.
I have done this from spark to
MSSQL in the past by making use of bulk copy and batch size option which was successful too.

Is there something similar that can be here for Postgres?

Adding the code I have tried and the time it took to run the process:

def inserter():
    start = timer()
    sql_res.write.format("jdbc").option("numPartitions","5").option("batchsize","200000")
    .option("url", "jdbc:postgresql://xyz.com:5435/abc_db") 
    .option("dbtable", "public.full_load").option("user", "root").option("password", "password").save()
    end = timer()
    print(timedelta(seconds=end-start))
inserter()

So I did the above approach for 10 million records and had 5 parallel connections as specified in numPartitions and also tried batch size of 200k.

The total time it took for the process was 0:14:05.760926 (fourteen minutes and five seconds).

Is there any other efficient approach which would reduce the time?

What would be the efficient or optimal batch size I can use ? Will increasing my batch size do the job quicker ? Or opening multiple connections i.e > 5 help me make the process quicker ?

On an average 14 mins for 10 million records is not bad, but looking for people out there who would have done this before to help answer this question.


Get this bounty!!!