#StackBounty: #scala #apache-spark #rdd Finding Maximum in Key Value RDD

Bounty: 100

I have a key-value RDD of the form :

(Some(23661587),
CompactBuffer(Posting(2,23661643,Some(23661587),0,None), 
              Posting(2,23661682,Some(23661587),0,None)))

Here Some(23661587) is the key and data inside CompactBuffer is the value. I want to select the Posting type with maximum value for a particular attribute for each key.

How can I do that? I have limited experience in Scala and Spark.
Thanks


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #google-cloud-storage #google-cloud-dataproc #pyarrow PySpark PandasUDF on GCP – Memory Allocation

Bounty: 200

I am using a pandas udf to train many ML models on GCP in DataProc (Spark). The main idea is that I have a grouping variable that represents the various sets of data in my data frame and I run something like this:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def test_train(grp_df):
    
  #train model on grp_df
  #evaluate model 
  #return metrics on 
 
    return (metrics)

result=df.groupBy('group_id').apply(test_train)

This works fine except when I use the non-sampled data, where errors are returned that appear to be related to memory issues. The messages are cryptic (to me) but if I sample down the data it runs, if I dont, it fails. Error messages are things like:

OSError: Read out of bounds (offset = 631044336, size = 69873416) in
file of size 573373864

or

Container killed by YARN for exceeding memory limits. 24.5 GB of 24
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.

My Question is how to set memory in the cluster to get this to work?

I understand that each group of data and the process being ran needs to fit entirely in the memory of the executor. I current have a 4-worker cluster with the following:

enter image description here

If I think the maximum size of data in the largest group_id requires 150GB of memory, it seems I really need each machine to operate on one group_id at a time. Atleast I get 4 times the speed compared to having a single worker or VM.

If I do the following, is this in fact creating 1 executor per machine that has access to all the cores minus 1 and 180 GB of memory? So that if in theory the largest group of data would work on a single VM with this much RAM, this process should work?

spark = SparkSession.builder 
  .appName('test') 
  .config('spark.executor.memory', '180g') 
  .config('spark.executor.cores', '63') 
  .config('spark.executor.instances', '1') 
  .getOrCreate() 


Get this bounty!!!

#StackBounty: #python #apache-spark #pyspark Use parallelize function over python objects

Bounty: 50

Is it possible in pyspark to use the parallelize function over python objects? I want to run on parallel on a list of objects, modified them using a function, and then print these objects.

def init_spark(appname):
  spark = SparkSession.builder.appName(appname).getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def run_on_configs_spark(object_list):
  spark,sc = init_spark(appname="analysis")
  p_configs_RDD = sc.parallelize(object_list)
  p_configs_RDD=p_configs_RDD.map(func)
  p_configs_RDD.foreach(print)

def func(object):
  return do-somthing(object)

When I run the above code, I encounter an error of "AttributeError: Can’t get attribute ‘Object’ on <module ‘pyspark.daemon’ from…> ". How can I solve it?

I did the following workaround. But I don’t think it is a good solution in general, and it assumes I can change the constructor of the object.

I have converted the object into a dictionary, and construed the object from the directory.

 def init_spark(appname):
  spark = SparkSession.builder.appName(appname).getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def run_on_configs_spark(object_list):
  spark,sc = init_spark(appname="analysis")
  p_configs_RDD = sc.parallelize([x.__dict__() for x in object_list])
  p_configs_RDD=p_configs_RDD.map(func)
  p_configs_RDD.foreach(print)

def func(dict):
  object=CreateObject(create_from_dict=True,dictionary=dict)
  return do-something(object)

In the constructor of the Object:

class Object:
   def __init__(create_from_dict=False,dictionary=None, other_params...):
      if(create_from_dict):
        self.__dict__.update(dictionary)
        return

Are there any better solutions?


Get this bounty!!!

#StackBounty: #sql #regex #scala #join #apache-spark Spark Scala: SQL rlike vs Custom UDF

Bounty: 50

I’ve a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using “spark sql rlike” method as below and it was able to hold the load until incoming record counts were less than 50K

PS: The regular expression reference data is a broadcasted dataset.

dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")

Then I wrote a custom UDF to transform them using Scala native regex search as below,

  1. Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
        regexDataset.value
            .select( "col_1", "regex_column")
            .collect
            .map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
    }

Implementation of Regex matching UDF,

    def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
        udf((input_column: String) => {
            for {
                text <- Option(input_column)
                matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
                if matches.nonEmpty
            } yield matches.map(x => x._1).min
        }, IntegerType)
    }

Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,

dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")

But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count increases above 1M. Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here or if there’s a better API for Scala regex match than what I’ve written here? or any suggestions to do this efficiently would be very helpful.


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #pyarrow #apache-arrow Is it possible to generate large DataFrame in a distributed way in pyspark …

Bounty: 50

The problem boils down to the following: I want to generate a DataFrame in pyspark using existing parallelized collection of inputs and a function which given one input can generate a relatively large batch of rows. In the example below I want to generate 10^12 rows dataframe using e.g. 1000 executors:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(I don’t really want to study distribution of random numbers given seed – this is just an example I was able to come up with to illustrate the situation when large dataframe is not loaded from warehouse, but generated by the code)

The code above does pretty much exactly what I want. Problem is that it does it in a very inefficient way – at expense of creating a python Row object for each row, then converting the python Row objects into internal Spark columnar representation.

Is there a way I can convert batch of rows already in columnar representation (e.g. one or a few numpy arrays as above np_array) just by letting spark know that these are the columns of a batch of values?

E.g. I can write the code to generate python collection RDD where each element is an pyarrow.RecordBatch or a pandas.DataFrame, but I can’t find a way to convert any of these into Spark DataFrame without creating an RDD of pyspark Row objects in the process.

There is at least a dozen of articles with examples how I can use pyarrow + pandas to convert a local (to driver) pandas dataframe to Spark dataframe efficiently, but that is not an option for me because I need data to be actually generated in a distributed way on executors rather than generating one pandas dataframe on driver and sending that to executors.

UPD.
I’ve found one way to avoid creation of Row objects – using RDD of python tuples. As expected it is still way too slow, but still a bit faster than using Row objects. Still, this is not really what I’m looking for (which is a really efficient way of passing columnar data to Spark from python).

Also measured time to do certain operations on a machine (crude way with quite a bit of variation in measured time, but still it representative in my opinion):
The dataset in question is 10M rows, 3 cols (one column is constant integer, other is integer range from 0 to 10M-1, third is floating point value generated using np.random.random_sample:

  • Locally generate pandas dataframe (10M rows): ~440-450ms
  • Locally generate python list of spark.sql.Row objects (10M rows): ~12-15s
  • Locally generate python list of tuples representing rows (10M rows): ~3.4-3.5s

Generate Spark dataframe using just 1 executor and 1 initial seed value:

  • using spark.createDataFrame(row_rdd, schema=my_schema): ~70-80s
  • using spark.createDataFrame(tuple_rdd, schema=my_schema): ~40-45s
  • (non-distributed creation) using spark.createDataFrame(pandas_df, schema=my_schema): ~0.4-0.5s (without pandas df generation itself which takes roughly same time) – with spark.sql.execution.arrow.enabled set to true.

The example with local-to-driver pandas dataframe converted to Spark dataframe in ~1s for 10M rows gives me a reason to believe same should be possible with dataframes generated in executors. However fastest I can achieve now is ~40s for 10M rows using RDD of python tuples.

So the question still stays – is there a way to generate a large Spark dataframe in a distributed way efficiently in pyspark?


Get this bounty!!!

#StackBounty: #pandas #apache-spark #pyspark #windowing Clean way to identify runs on a PySpark DF ArrayType column

Bounty: 100

Given a PySpark DataFrame of the form:

+----+--------+
|time|messages|
+----+--------+
| t01|    [m1]|
| t03|[m1, m2]|
| t04|    [m2]|
| t06|    [m3]|
| t07|[m3, m1]|
| t08|    [m1]|
| t11|    [m2]|
| t13|[m2, m4]|
| t15|    [m2]|
| t20|    [m4]|
| t21|      []|
| t22|[m1, m4]|
+----+--------+

I’d like to refactor it to compress runs containing the same message (the order of the output doesn’t matter much, but sorted her for clarity):

+----------+--------+-------+
|start_time|end_time|message|
+----------+--------+-------+
|       t01|     t03|     m1|
|       t07|     t08|     m1|
|       t22|     t22|     m1|
|       t03|     t04|     m2|
|       t11|     t15|     m2|
|       t06|     t07|     m3|
|       t13|     t13|     m4|
|       t20|     t20|     m4|
|       t22|     t22|     m4|
+----------+--------+-------+

(i.e. treat the message column as a sequence and identify the start and end of “runs” for each message),

Is there a clean way to make this transformation in Spark? Currently, I’m dumping this as a 6 GB TSV and processing it imperatively.

I’m open to the possibility of toPandas-ing this and accumulating on the driver if Pandas has a clean way to do this aggregation.


Get this bounty!!!

#StackBounty: #java #apache-spark #apache-spark-sql Update value in struct type column in java spark

Bounty: 50

I want capability to update value in nested dataset. For this I have a created as nested Dataset in Spark. It has below schema structure:-

root

 |-- field_a: string (nullable = false)

 |-- field_b: struct (nullable = true)

 |    |-- field_d: struct(nullable = false)
          |-- field_not_to_update: string(nullable = true)

 |        |-- field_to_update: string(nullable = false)
 |   field_c: string (nullable = false)

Now I wanted to update value in field_to_update in the dataset. I have tried

aFooData.withColumn("field_b.field_d.field_to_update", lit("updated_val")

Also tried,

aFooData.foreach(new ClassWithForEachFunction());

where ClassWithForEachFunction implements ForEachFunction<Row> and has method public void call(Row aRow) to update field_to_update attribute. Tried same with lamda as well but it was throwing Task not serializable exception so has to go for long process.

None of them are fruitful so far and I am getting same Dataset with foreach and new column with name field_b.field_d.field_to_update in second case. Any other suggestions for same?


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #count #rdd #reduce pyspark rdd taking the max frequency with the least age

Bounty: 50

I have an rdd like the following:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

I am trying to reduce each id to just 1 record by taking the highest frequency code using code like:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))
.reduceByKey(lambda x, y: x + y)
.map(lambda x: [i[1] for i in x[1]])
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

There is one problem with this implementation, it doesn’t consider age, so if for example one id had multiple codes with a frequency of 2, it would take the last code.

To illustrate this issue, please consider this reduced id:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

my code would output:

[(2, u'"V81.2"')]

when I would like for it to output:

[(2, u'"388.400000"')]

because although the frequency is the same for both of these codes, code 388.400000 has a lesser age and appears first.

by adding this line after the .reduceByKey():

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

I’m able to filter out those with greater than min age, but then I’m only considering those with min age and not all codes to calculate their frequency. I can’t apply the same/ similar logic after [max(zip((x.count(i) for i in set(x)), set(x)))] as the set(x) is the set of x[1], which doesn’t consider the age.

I should add, I don’t want to just take the first code with the highest frequency, I’d like to take the highest frequency code with the least age, or the code that appears first, if this is possible, using only rdd actions.

equivalent code in SQL would be something like:

SELECT id, code, MIN(age) AS age, count(*) AS cnt,
             ROW_NUMBER() OVER (PARTITION BY id order by count(*) DESC) AS seqnum
      FROM tbl
      GROUP BY id, code
     ) da
WHERE seqnum = 1

I’d really appreciate any help with this.


Get this bounty!!!

#StackBounty: #scala #apache-spark #apache-kafka #spark-structured-streaming Spark Streaming – Join on multiple kafka stream operation …

Bounty: 50

I have 3 kafka streams having 600k+ records each, spark streaming takes more than 10 mins to process simple joins between streams.

Spark Cluster config:

Spark Master UI

This is how i’m reading kafka streams to tempviews in spark(scala)

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

I join 3 TABLES using spark spark sql

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

Execution of Job:

Job UI

Am i missing out some configuration on spark that i’ve to look into?


Get this bounty!!!

#StackBounty: #apache-spark #group-by #pyspark #k-means Pyspark: applying kmeans on different groups of a dataframe

Bounty: 50

Using Pyspark I would like to apply kmeans separately on groups of a dataframe and not to the whole dataframe at once. For the moment I use a for loop which iterates on each group, applies kmeans and appends the result to another table. But having a lot of groups makes it time consuming. Anyone could help me please??
Thanks a lot!

for customer in customer_list:
    temp_df = togroup.filter(col("customer_id")==customer)
    df = assembler.transform(temp_df)
    k = 1
    while (k < 5 & mtrc < width):
        k += 1
        kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
        model = kmeans.fit(df)
        mtric = 1 - model.computeCost(df)/ttvar
        a = model.transform(df)select(cols)
        allcustomers = allcustomers .union(a)


Get this bounty!!!