#StackBounty: #apache-spark #pyspark #google-cloud-storage #google-cloud-dataproc #pyarrow PySpark PandasUDF on GCP – Memory Allocation

Bounty: 200

I am using a pandas udf to train many ML models on GCP in DataProc (Spark). The main idea is that I have a grouping variable that represents the various sets of data in my data frame and I run something like this:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def test_train(grp_df):
    
  #train model on grp_df
  #evaluate model 
  #return metrics on 
 
    return (metrics)

result=df.groupBy('group_id').apply(test_train)

This works fine except when I use the non-sampled data, where errors are returned that appear to be related to memory issues. The messages are cryptic (to me) but if I sample down the data it runs, if I dont, it fails. Error messages are things like:

OSError: Read out of bounds (offset = 631044336, size = 69873416) in
file of size 573373864

or

Container killed by YARN for exceeding memory limits. 24.5 GB of 24
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.

My Question is how to set memory in the cluster to get this to work?

I understand that each group of data and the process being ran needs to fit entirely in the memory of the executor. I current have a 4-worker cluster with the following:

enter image description here

If I think the maximum size of data in the largest group_id requires 150GB of memory, it seems I really need each machine to operate on one group_id at a time. Atleast I get 4 times the speed compared to having a single worker or VM.

If I do the following, is this in fact creating 1 executor per machine that has access to all the cores minus 1 and 180 GB of memory? So that if in theory the largest group of data would work on a single VM with this much RAM, this process should work?

spark = SparkSession.builder 
  .appName('test') 
  .config('spark.executor.memory', '180g') 
  .config('spark.executor.cores', '63') 
  .config('spark.executor.instances', '1') 
  .getOrCreate() 


Get this bounty!!!

#StackBounty: #machine-learning #python #pyspark #lime 'TabularLIME' is not defined Azure DataBricks

Bounty: 50

I have just started working on Azure Data bricks.

I am facing some error while running a already created Python NoteBook.

Here is the code:

import mmlspark
from mmlspark import *

lime = TabularLIME()
  .setModel(randomForestModel)
  .setPredictionCol("predict")
  .setOutputCol("weights")
  .setInputCol("feat")
lime_model = lime.fit(eng_train_pipe)
lime_model.save('dbfs:/mnt/aimodels/rf-eng-lag-lime')

Getting this error:

NameError                                 Traceback (most recent call last)
<command-2244283784192060> in <module>
      2 from mmlspark import *
      3 
----> 4 lime = TabularLIME()
      5   .setModel(rfModel)
      6   .setPredictionCol("prediction")

NameError: name 'TabularLIME' is not defined


Get this bounty!!!

#StackBounty: #machine-learning #python #pyspark #lime 'TabularLIME' is not defined Azure DataBricks

Bounty: 50

I have just started working on Azure Data bricks.

I am facing some error while running a already created Python NoteBook.

Here is the code:

import mmlspark
from mmlspark import *

lime = TabularLIME()
  .setModel(randomForestModel)
  .setPredictionCol("predict")
  .setOutputCol("weights")
  .setInputCol("feat")
lime_model = lime.fit(eng_train_pipe)
lime_model.save('dbfs:/mnt/aimodels/rf-eng-lag-lime')

Getting this error:

NameError                                 Traceback (most recent call last)
<command-2244283784192060> in <module>
      2 from mmlspark import *
      3 
----> 4 lime = TabularLIME()
      5   .setModel(rfModel)
      6   .setPredictionCol("prediction")

NameError: name 'TabularLIME' is not defined


Get this bounty!!!

#StackBounty: #python #apache-spark #pyspark Use parallelize function over python objects

Bounty: 50

Is it possible in pyspark to use the parallelize function over python objects? I want to run on parallel on a list of objects, modified them using a function, and then print these objects.

def init_spark(appname):
  spark = SparkSession.builder.appName(appname).getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def run_on_configs_spark(object_list):
  spark,sc = init_spark(appname="analysis")
  p_configs_RDD = sc.parallelize(object_list)
  p_configs_RDD=p_configs_RDD.map(func)
  p_configs_RDD.foreach(print)

def func(object):
  return do-somthing(object)

When I run the above code, I encounter an error of "AttributeError: Can’t get attribute ‘Object’ on <module ‘pyspark.daemon’ from…> ". How can I solve it?

I did the following workaround. But I don’t think it is a good solution in general, and it assumes I can change the constructor of the object.

I have converted the object into a dictionary, and construed the object from the directory.

 def init_spark(appname):
  spark = SparkSession.builder.appName(appname).getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def run_on_configs_spark(object_list):
  spark,sc = init_spark(appname="analysis")
  p_configs_RDD = sc.parallelize([x.__dict__() for x in object_list])
  p_configs_RDD=p_configs_RDD.map(func)
  p_configs_RDD.foreach(print)

def func(dict):
  object=CreateObject(create_from_dict=True,dictionary=dict)
  return do-something(object)

In the constructor of the Object:

class Object:
   def __init__(create_from_dict=False,dictionary=None, other_params...):
      if(create_from_dict):
        self.__dict__.update(dictionary)
        return

Are there any better solutions?


Get this bounty!!!

#StackBounty: #python #database #pyspark How can I insert a PySpark dataframe into a database with a snowflake schema?

Bounty: 50

With PySpark I’m computing a dataframe, how can I append this dataframe into my database, if this database has a snowflake schema?

How can I specify which way to split my dataframe in order to fit my CSV-like data into multiple joint tables?

My question is not specific to Pyspark, the same question could be asked about pandas.


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #pyarrow #apache-arrow Is it possible to generate large DataFrame in a distributed way in pyspark …

Bounty: 50

The problem boils down to the following: I want to generate a DataFrame in pyspark using existing parallelized collection of inputs and a function which given one input can generate a relatively large batch of rows. In the example below I want to generate 10^12 rows dataframe using e.g. 1000 executors:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(I don’t really want to study distribution of random numbers given seed – this is just an example I was able to come up with to illustrate the situation when large dataframe is not loaded from warehouse, but generated by the code)

The code above does pretty much exactly what I want. Problem is that it does it in a very inefficient way – at expense of creating a python Row object for each row, then converting the python Row objects into internal Spark columnar representation.

Is there a way I can convert batch of rows already in columnar representation (e.g. one or a few numpy arrays as above np_array) just by letting spark know that these are the columns of a batch of values?

E.g. I can write the code to generate python collection RDD where each element is an pyarrow.RecordBatch or a pandas.DataFrame, but I can’t find a way to convert any of these into Spark DataFrame without creating an RDD of pyspark Row objects in the process.

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

UPD.
I’ve found one way to avoid creation of Row objects – using RDD of python tuples. As expected it is still way too slow, but still a bit faster than using Row objects. Still, this is not really what I’m looking for (which is a really efficient way of passing columnar data to Spark from python).

Also measured time to do certain operations on a machine (crude way with quite a bit of variation in measured time, but still it representative in my opinion):
The dataset in question is 10M rows, 3 cols (one column is constant integer, other is integer range from 0 to 10M-1, third is floating point value generated using np.random.random_sample:

  • Locally generate pandas dataframe (10M rows): ~440-450ms
  • Locally generate python list of spark.sql.Row objects (10M rows): ~12-15s
  • Locally generate python list of tuples representing rows (10M rows): ~3.4-3.5s

Generate Spark dataframe using just 1 executor and 1 initial seed value:

  • using spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s
  • using spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s
  • (non-distributed creation) using spark.createDataFrame(pandas_df, schema=my_schema): ~0.4-0.5s (without pandas df generation itself which takes roughly same time) – with spark.sql.execution.arrow.enabled set to true.

The example with local-to-driver pandas dataframe converted to Spark dataframe in ~1s for 10M rows gives me a reason to believe same should be possible with dataframes generated in executors. However fastest I can achieve now is ~40s for 10M rows using RDD of python tuples.

So the question still stays – is there a way to generate a large Spark dataframe in a distributed way efficiently in pyspark?


Get this bounty!!!

#StackBounty: #pandas #apache-spark #pyspark #windowing Clean way to identify runs on a PySpark DF ArrayType column

Bounty: 100

Given a PySpark DataFrame of the form:

+----+--------+
|time|messages|
+----+--------+
| t01|    [m1]|
| t03|[m1, m2]|
| t04|    [m2]|
| t06|    [m3]|
| t07|[m3, m1]|
| t08|    [m1]|
| t11|    [m2]|
| t13|[m2, m4]|
| t15|    [m2]|
| t20|    [m4]|
| t21|      []|
| t22|[m1, m4]|
+----+--------+

I’d like to refactor it to compress runs containing the same message (the order of the output doesn’t matter much, but sorted her for clarity):

+----------+--------+-------+
|start_time|end_time|message|
+----------+--------+-------+
|       t01|     t03|     m1|
|       t07|     t08|     m1|
|       t22|     t22|     m1|
|       t03|     t04|     m2|
|       t11|     t15|     m2|
|       t06|     t07|     m3|
|       t13|     t13|     m4|
|       t20|     t20|     m4|
|       t22|     t22|     m4|
+----------+--------+-------+

(i.e. treat the message column as a sequence and identify the start and end of “runs” for each message),

Is there a clean way to make this transformation in Spark? Currently, I’m dumping this as a 6 GB TSV and processing it imperatively.

I’m open to the possibility of toPandas-ing this and accumulating on the driver if Pandas has a clean way to do this aggregation.


Get this bounty!!!

#StackBounty: #pandas #dataframe #pyspark #apache-spark-sql #pandas-groupby Make groupby.apply more efficient or convert to spark

Bounty: 200

All,

I am using pandas groupby.apply to use my own custom function. However, I have noticed that the function is very, very slow. Can someone help me in converting this code to apply to spark dataframes?

Adding simple example for people to use:

import pandas as pd
import operator

df = pd.DataFrame({
    'Instruments': ['A', 'B', 'A', 'B', 'A', 'C', 'C', 'B'],
    'Sers': ['Wind', 'Tool', 'Wind', 'Wind', 'Tool', 'Tool', 'Tool', 'Wind'],
    'Sounds': [42, 21, 34, 56, 43, 61, 24, 23]
})
def get_stats(data_frame):

    # For each grouped data_frame, cutoff all Sounds greater than 99th percentile
    cutoff_99 = data_frame[data_frame.Sounds <= data_frame.Sounds.quantile(.99)]

    # Based on total number of records, select the most-abundant sers
    sers_to_use = max((cutoff_99.Sers.value_counts() / cutoff_99.shape[0]).to_dict().items(), key = operator.itemgetter(1))[0]

    # Give me the average sound of the selected sers
    avg_sounds_of_sers_to_use = cutoff_99.loc[cutoff_99["Sers"] == sers_to_use].Sounds.mean()

    # Pre-allocate lists
    cool = []
    mean_sounds = []
    ratios = []
    _difference = []


    for i in cutoff_99.Sers.unique():
        # add each unique sers of that dataframe 
        cool.append(i) 

        # get the mean sound of that ser
        sers_mean_sounds = (cutoff_99.loc[cutoff_99["Sers"] == i].Sounds).mean()

        # add each mean sound for each sers
        mean_sounds.append(sers_mean_sounds) 

        # get the ratio of the sers to use vs. the current sers; add all of the ratios to the list
        ratios.append(avg_sounds_of_sers_to_use / sers_mean_sounds)

        # get the percent difference and add it to a list
        _difference.append(
            float(
                round(
                    abs(avg_sounds_of_sers_to_use - sers_mean_sounds)
                    / ((avg_sounds_of_sers_to_use + sers_mean_sounds) / 2),
                    2,
                )
                * 100
            )
        )

    # return a series with these lists/values.
    return pd.Series({
        'Cools': cool,
        'Chosen_Sers': sers_to_use,
        'Average_Sounds_99_Percent': mean_sounds,
        'Mean_Ratios': ratios,
        'Percent_Differences': _difference
    }) 

I call the function as follows in pandas:
df.groupby('Instruments').apply(get_stats)


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #count #rdd #reduce pyspark rdd taking the max frequency with the least age

Bounty: 50

I have an rdd like the following:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

I am trying to reduce each id to just 1 record by taking the highest frequency code using code like:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))
.reduceByKey(lambda x, y: x + y)
.map(lambda x: [i[1] for i in x[1]])
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

There is one problem with this implementation, it doesn’t consider age, so if for example one id had multiple codes with a frequency of 2, it would take the last code.

To illustrate this issue, please consider this reduced id:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

my code would output:

[(2, u'"V81.2"')]

when I would like for it to output:

[(2, u'"388.400000"')]

because although the frequency is the same for both of these codes, code 388.400000 has a lesser age and appears first.

by adding this line after the .reduceByKey():

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

I’m able to filter out those with greater than min age, but then I’m only considering those with min age and not all codes to calculate their frequency. I can’t apply the same/ similar logic after [max(zip((x.count(i) for i in set(x)), set(x)))] as the set(x) is the set of x[1], which doesn’t consider the age.

I should add, I don’t want to just take the first code with the highest frequency, I’d like to take the highest frequency code with the least age, or the code that appears first, if this is possible, using only rdd actions.

equivalent code in SQL would be something like:

SELECT id, code, MIN(age) AS age, count(*) AS cnt,
             ROW_NUMBER() OVER (PARTITION BY id order by count(*) DESC) AS seqnum
      FROM tbl
      GROUP BY id, code
     ) da
WHERE seqnum = 1

I’d really appreciate any help with this.


Get this bounty!!!

#StackBounty: #apache-spark #group-by #pyspark #k-means Pyspark: applying kmeans on different groups of a dataframe

Bounty: 50

Using Pyspark I would like to apply kmeans separately on groups of a dataframe and not to the whole dataframe at once. For the moment I use a for loop which iterates on each group, applies kmeans and appends the result to another table. But having a lot of groups makes it time consuming. Anyone could help me please??
Thanks a lot!

for customer in customer_list:
    temp_df = togroup.filter(col("customer_id")==customer)
    df = assembler.transform(temp_df)
    k = 1
    while (k < 5 & mtrc < width):
        k += 1
        kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
        model = kmeans.fit(df)
        mtric = 1 - model.computeCost(df)/ttvar
        a = model.transform(df)select(cols)
        allcustomers = allcustomers .union(a)


Get this bounty!!!