#StackBounty: #apache-spark #pyspark #pyarrow #apache-arrow Is it possible to generate large DataFrame in a distributed way in pyspark …

Bounty: 50

The problem boils down to the following: I want to generate a DataFrame in pyspark using existing parallelized collection of inputs and a function which given one input can generate a relatively large batch of rows. In the example below I want to generate 10^12 rows dataframe using e.g. 1000 executors:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(I don’t really want to study distribution of random numbers given seed – this is just an example I was able to come up with to illustrate the situation when large dataframe is not loaded from warehouse, but generated by the code)

The code above does pretty much exactly what I want. Problem is that it does it in a very inefficient way – at expense of creating a python Row object for each row, then converting the python Row objects into internal Spark columnar representation.

Is there a way I can convert batch of rows already in columnar representation (e.g. one or a few numpy arrays as above np_array) just by letting spark know that these are the columns of a batch of values?

E.g. I can write the code to generate python collection RDD where each element is an pyarrow.RecordBatch or a pandas.DataFrame, but I can’t find a way to convert any of these into Spark DataFrame without creating an RDD of pyspark Row objects in the process.

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

UPD.
I’ve found one way to avoid creation of Row objects – using RDD of python tuples. As expected it is still way too slow, but still a bit faster than using Row objects. Still, this is not really what I’m looking for (which is a really efficient way of passing columnar data to Spark from python).

Also measured time to do certain operations on a machine (crude way with quite a bit of variation in measured time, but still it representative in my opinion):
The dataset in question is 10M rows, 3 cols (one column is constant integer, other is integer range from 0 to 10M-1, third is floating point value generated using np.random.random_sample:

  • Locally generate pandas dataframe (10M rows): ~440-450ms
  • Locally generate python list of spark.sql.Row objects (10M rows): ~12-15s
  • Locally generate python list of tuples representing rows (10M rows): ~3.4-3.5s

Generate Spark dataframe using just 1 executor and 1 initial seed value:

  • using spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s
  • using spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s
  • (non-distributed creation) using spark.createDataFrame(pandas_df, schema=my_schema): ~0.4-0.5s (without pandas df generation itself which takes roughly same time) – with spark.sql.execution.arrow.enabled set to true.

The example with local-to-driver pandas dataframe converted to Spark dataframe in ~1s for 10M rows gives me a reason to believe same should be possible with dataframes generated in executors. However fastest I can achieve now is ~40s for 10M rows using RDD of python tuples.

So the question still stays – is there a way to generate a large Spark dataframe in a distributed way efficiently in pyspark?


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.