I am using a pandas udf to train many ML models on GCP in DataProc (Spark). The main idea is that I have a grouping variable that represents the various sets of data in my data frame and I run something like this:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP) def test_train(grp_df): #train model on grp_df #evaluate model #return metrics on return (metrics) result=df.groupBy('group_id').apply(test_train)
This works fine except when I use the non-sampled data, where errors are returned that appear to be related to memory issues. The messages are cryptic (to me) but if I sample down the data it runs, if I dont, it fails. Error messages are things like:
OSError: Read out of bounds (offset = 631044336, size = 69873416) in
file of size 573373864
Container killed by YARN for exceeding memory limits. 24.5 GB of 24
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead or disabling
yarn.nodemanager.vmem-check-enabled because of YARN-4714.
My Question is how to set memory in the cluster to get this to work?
I understand that each group of data and the process being ran needs to fit entirely in the memory of the executor. I current have a 4-worker cluster with the following:
If I think the maximum size of data in the largest group_id requires 150GB of memory, it seems I really need each machine to operate on one group_id at a time. Atleast I get 4 times the speed compared to having a single worker or VM.
If I do the following, is this in fact creating 1 executor per machine that has access to all the cores minus 1 and 180 GB of memory? So that if in theory the largest group of data would work on a single VM with this much RAM, this process should work?
spark = SparkSession.builder .appName('test') .config('spark.executor.memory', '180g') .config('spark.executor.cores', '63') .config('spark.executor.instances', '1') .getOrCreate()