#StackBounty: #apache-spark #pyspark #google-cloud-storage #google-cloud-dataproc #pyarrow PySpark PandasUDF on GCP – Memory Allocation

Bounty: 200

I am using a pandas udf to train many ML models on GCP in DataProc (Spark). The main idea is that I have a grouping variable that represents the various sets of data in my data frame and I run something like this:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def test_train(grp_df):
    
  #train model on grp_df
  #evaluate model 
  #return metrics on 
 
    return (metrics)

result=df.groupBy('group_id').apply(test_train)

This works fine except when I use the non-sampled data, where errors are returned that appear to be related to memory issues. The messages are cryptic (to me) but if I sample down the data it runs, if I dont, it fails. Error messages are things like:

OSError: Read out of bounds (offset = 631044336, size = 69873416) in
file of size 573373864

or

Container killed by YARN for exceeding memory limits. 24.5 GB of 24
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.

My Question is how to set memory in the cluster to get this to work?

I understand that each group of data and the process being ran needs to fit entirely in the memory of the executor. I current have a 4-worker cluster with the following:

enter image description here

If I think the maximum size of data in the largest group_id requires 150GB of memory, it seems I really need each machine to operate on one group_id at a time. Atleast I get 4 times the speed compared to having a single worker or VM.

If I do the following, is this in fact creating 1 executor per machine that has access to all the cores minus 1 and 180 GB of memory? So that if in theory the largest group of data would work on a single VM with this much RAM, this process should work?

spark = SparkSession.builder 
  .appName('test') 
  .config('spark.executor.memory', '180g') 
  .config('spark.executor.cores', '63') 
  .config('spark.executor.instances', '1') 
  .getOrCreate() 


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.