#StackBounty: #apache-spark #google-cloud-platform #google-bigquery Getting java.io.IOException: Error getting access token from metada…

Bounty: 100

I am able to fetch the data from Bigquery using gcs-connector and spark-bigquery-in Spark application. But getting below error while trying to load data into Bigquery in GCP by using spark application.

Exception in thread "main" java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token

Code:

df_bigquery.write.format("bigquery").option("credentialsFile", "D://input/absolute-vertex-321015-a78e81ae77a0.json").option("parentProject", "absolute-vertex-321015").option("temporaryGcsBucket","emp_demo_1").save("absolute-vertex-321015.org.employee_loaded")

Any help is deeply appreciated.

Thanks in Advance.


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #apache-spark-sql Join computed twice on pyspark, maybe I dont understand lazyness?

Bounty: 50

too long since I used spark last time, I’m on it again with Spark 3.1, and here is my issue:
I have 20M rows left joined 400M rows, the original code is:

times= [50000,20000,10000,1000]
for time in times:
    join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-time))
                                 ], 'left')

knowing that each iteration (time variable) contains the next I thought in making the DataFrame lighter before comparing with each value, so coded this:

times= [50000,20000,10000,1000]

join = (df_a.join(df_b,
                                 [
                                     df_a["a"] == df_b["a"],
                                     (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
                                     > 5,
                                     (df_a["task"]) = (df_b["task"]-50000))
                                 ], 'left')

join.checkpoint() # Save current state, and cleaned dataframe

for time in times:
    step_join = join_df.where((join_df["task"]) = (join_df["task"]-time)))
    # Make calculations and Store result for the iteration...

When looking at the visual SQL diagram on the Spark history server, it seems that my improved solution (?) on the second join is not used, it makes the whole left join again on each iteration, not making use of the cleaner, lighter DataFrame.

My final idea was to use the new df for the next iteration so each filter will be lighter. Are my thoughts correct? am I missing something?

Image of how it looks like, this is a still-running code, the SortMergeJoin of the middle is the decoupled to filter, the second "Filter" only filters a little more, but on left and right you can see that it’s computing again the SortMergeJoin instead of reusing the previously calculated.
enter image description here

And this is how the processing looks like, same calculations each time plus filter
enter image description here

Last time have to remove the checkpoint because with 55B rows on a join it was hard to store the data (>100TB)

My cluster configuration for 30 instances 64vcore 488GB RAM + driver

        "spark.executor.instances", "249").config("spark.executor.memoryOverhead", "10240").config(
        "spark.executor.memory", "87g").config("spark.executor.cores", "12").config("spark.driver.cores", "12").config(
        "spark.default.parallelism", "5976").config("spark.sql.adaptive.enabled", "true").config(
        "spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "3100").config(
        "spark.yarn.driver.memoryOverhead", "10240").config("spark.sql.autoBroadcastJoinThreshold", "2100").config(
        "spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

I’m using the excel calculator on this site for tunning of everything except spark.sql.shuffle.partitions https://www.c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/ now using 10 executors per node

Tried using .cache() on the join it’s still slower than the 4 paralel joins, the first join es muy slower.
Note that .cache() is good for a subset but for a 100TB join result it would be slower because it will cache to disk.
thanks!


Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql Parallelize window function or speed up window function in spark without changing the win…

Bounty: 50

I have a set of window functions in a spark query which, among other things, partitions on user_num. One of these user_nums has far more rows than the others. This row is getting computed in a single task, which has a much higher shuffle read, shuffle remote read and ultimately takes a huge amount of time.

Select LAG(e) OVER (PARTITION BY user_num, a, date ORDER BY time) as aa,
LAST_VALUE(e)  OVER (PARTITION BY a, date ORDER BY time ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as bbb
FROM table

Is there any setting or any way to have this run on different tasks or otherwise shrink this amount of time in a way that would require no or minimal changes to the logic of the window functions?

I.E Could I cache at a certain point, increase number of partitions, increase exec mem etc.


Get this bounty!!!

#StackBounty: #scala #apache-spark #machine-learning #xgboost #google-cloud-dataproc Implement XGBoost in Scala Spark, dataproc zeppeli…

Bounty: 100

I am trying to implement an xgboost model in scala, using zeppelin in dataproc (google cloud). This is the code I’m implementing:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import scala.collection.mutable
import org.apache.spark.sql.{DataFrame, _}
import spark.implicits._
import org.apache.spark.ml.{Pipeline, PipelineStage}

Adding deppendency (also added jar in zeppelin notebook dependencies)

<dependency>
    <groupId>ml.dmlc</groupId>
    <artifactId>xgboost4j-spark</artifactId>
    <version>0.72</version>
</dependency>

Dummy data:

val someData = Seq(
Row(8, 15 1),
Row(64, 25 1),
Row(27, 22 0)
)

val someSchema = List(
StructField("var1", IntegerType, true),
StructField("var2", IntegerType, true),
StructField("Classification", IntegerType, true)
)

val data= spark.createDataFrame(
spark.sparkContext.parallelize(someData),
StructType(someSchema)
)

Model implementation:

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
val stringIndexer = new StringIndexer().
  setInputCol("Classification").
  setOutputCol("label").
  fit(data)
val labelTransformed = stringIndexer.transform(data).drop("Classification")
val vectorAssembler = new VectorAssembler().
  setInputCols(Array("var1", "var2")).
  setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "label")

import ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator
val paramMap = Map[String, Any]("objective" -> "binary:logistic", "nworkers" -> 2)
val est = new XGBoostEstimator(paramMap)
val model = est.fit(xgbInput)

Everything works except for the very last line, where I get the following error:

Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.156.0.33, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=2}
ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
  at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:406)
  at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:356)
  at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:337)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:336)
  at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:139)
  at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:36)
  at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
  ... 69 elided

Once again, using scala on zeppelin through dataproc, spark version is 2.4.5.

Can anyone help me?

EDIT: Full error logs:

Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.156.0.9, DMLC_TRACKER_PORT=9091, DMLC_NUM_WORKER=2}
ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
    at ml.dmlc.xgboost4j.scala.spark.XGBoost$.ml$dmlc$xgboost4j$scala$spark$XGBoost$$postTrackerReturnProcessing(XGBoost.scala:406)
    at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:356)
    at ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4.apply(XGBoost.scala:337)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:336)
    at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:139)
    at ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator.train(XGBoostEstimator.scala:36)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:63)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:61)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:74)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:76)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:78)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:80)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:82)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:84)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:86)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:88)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:90)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:92)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:94)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:96)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:98)
    at $line15134072657.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:100)
    at $line15134072657.$read$$iw$$iw$$iw$$iw.<init>(<console>:102)
    at $line15134072657.$read$$iw$$iw$$iw.<init>(<console>:104)
    at $line15134072657.$read$$iw$$iw.<init>(<console>:106)
    at $line15134072657.$read$$iw.<init>(<console>:108)
    at $line15134072657.$read.<init>(<console>:110)
    at $line15134072657.$read$.<init>(<console>:114)
    at $line15134072657.$read$.<clinit>(<console>)
    at $line15134072657.$eval$.$print$lzycompute(<console>:7)
    at $line15134072657.$eval$.$print(<console>:6)
    at $line15134072657.$eval.$print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
    at org.apache.zeppelin.spark.SparkScala211Interpreter.scalaInterpret(SparkScala211Interpreter.scala:108)
    at org.apache.zeppelin.spark.BaseSparkScalaInterpreter$$anonfun$_interpret$1$1.apply(BaseSparkScalaInterpreter.scala:100)
    at org.apache.zeppelin.spark.BaseSparkScalaInterpreter$$anonfun$_interpret$1$1.apply(BaseSparkScalaInterpreter.scala:94)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at scala.Console$.withOut(Console.scala:65)
    at org.apache.zeppelin.spark.BaseSparkScalaInterpreter._interpret$1(BaseSparkScalaInterpreter.scala:94)
    at org.apache.zeppelin.spark.BaseSparkScalaInterpreter.interpret(BaseSparkScalaInterpreter.scala:125)
    at org.apache.zeppelin.spark.NewSparkInterpreter.interpret(NewSparkInterpreter.java:147)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:73)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:632)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:188)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #google-cloud-dataproc Why does spark application crash with exception java.net.SocketException: C…

Bounty: 100

I am trying to load a table from Sqlserver database to Bigquery which is of size 27gb, 218million rows and 28columns.

The source table doesn’t have any column that contain unique values inorder for Spark to partition the incoming data evenly. So I applied row_number() to the data I am reading as below:

spark = SparkSession.builder.appName('Read_from_Source').getOrCreate()
dataframe = spark.read.format('jdbc').option('url', URL).option('driver', 'con.microsoft.sqlserver.jdbc.SQLServerDriver).
            option('user', user).
            option('password', password).
            option('dbtable', f'select *, row_number() over(ORDER BY (SELECT 1)) as row_num from {tablename}) as temp_load').
            option('partitionColumn',row_num).
            option('numPartitions', 400).
            option('lowerBound', 1).
            option('upperBound', countOfNumOfRows).
            load()
dataframe.write.format('bigquery').option('table','tablename').mode('overwrite').save()

I gave the below configuration to the job:

> spark.submit.deployMode = Cluster 
> spark.executor.instances=4
> spark.execuor.memory=2g spark.executor.cores=4 spark.driver.memory=2g
> spark.network.timeout=240

When I submit the job, it starts well but after some time if fails with the an exception connection reset which can be seen below.

Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1981)

The executors and the stage could be seen below. Executors enter image description here

enter image description here

Stages of my job (There is only one stage since it is just read and load):

enter image description here

I thought the connection was breaking/interrupting while the data is being read and hence the connection reset exception. So I tried to increase spark.network.timeout to 100000 and increased the numPartitions from 200 to 400. The error is still the same.

I also increased the executor instances to 6 (I have 2 worker nodes) and executor memory to 6gb, driver memory to 4gb. I still see the same issue.

I am applying partitioning here while reading the data in order to avoid data skew. The memory given is also very high.

Edit1: There is no data going to driver here since the given lines is the only code I am running.

Is there anything wrong with the way I framed the code ? Can anyone suggest me a fix to this exception ?


Get this bounty!!!

#StackBounty: #python #apache-spark #performance #pyspark #scalability DataFrame.write.parquet() uses only one executor, does not scale

Bounty: 50

I’ve read:


I’m trying to generate a test dataset with 10 billion rows by crossJoining a seed dataframe with some other dataframes. My problem is that the last step of the process final_df.write.parquet() only uses one worker/executor no matter how many there are.

This obviously doesn’t scale to generate billions. Which is the problem.

E.g. in a 3 node cluster with 4 cores each, final_df has 64 partitions, but only one executor writes one parquet file with all the records from that dataframe. I’ve also tried with 12 nodes, which produces dataframe with 1936 partitions. But same problem.

A few observations:

  • I think 64 partitions come from 2 crossJoin(). One node is master leaving 2 executors, each with 4 processors so it comes out to: 2*2*4*4 = 64.
  • If I uncomment lines to coalesce() it reduces number of partitions to 8. But only one executor writes one parquet file with all the records from that dataframe.
  • If I repartition() (no coalesce()), then I get 8 files, all executors are used and writing is distributed perfectly. BUT now the problem moves to repartitioning step, which is done by just one executor. Same problem in the end.
import os, math
import pyspark.sql.functions as F
from datetime import datetime as dt

def log(*args):
  print(dt.now().isoformat() + ' ' + ' '.join([str(s) for s in args]))

log('spark.version', str(spark.version))

log("reading seed file")
spark.conf.set("fs.azure.account.key.myaccount.dfs.core.windows.net", "my key")
seed_df = spark.read.csv("abfss://fs1@myaccount.dfs.core.windows.net/seed.csv", header=True)

# NUM_RECORDS_TO_GENERATE = 10_000_000_000
NUM_RECORDS_TO_GENERATE = 2_000_000
NUM_RECORDS_TO_GENERATE = NUM_RECORDS_TO_GENERATE + (NUM_RECORDS_TO_GENERATE % seed_df.count())
array_len = int(math.sqrt(NUM_RECORDS_TO_GENERATE / seed_df.count()))

log("array_len: %s, NUM_RECORDS_TO_GENERATE: %s, seed_df.count(): %s" % (array_len, NUM_RECORDS_TO_GENERATE, seed_df.count()))

df1 = spark.createDataFrame(data=[[ [1] * array_len ]])
df2 = df1.withColumn('exploded', F.explode(df1['_1'])).drop('_1')
df3 = df2.crossJoin(df2) # contains array_len ^ 2 = NUM_RECORDS_TO_GENERATE / seed_df.count() records
newdf = df3.crossJoin(seed_df) # contains NUM_RECORDS_TO_GENERATE
final_df = newdf.withColumn('uniq_row_id', F.monotonically_increasing_id()).drop('exploded') # add unique id column

# log("repartitioning")
# final_df = final_df.repartition(int(final_df.rdd.getNumPartitions() / 2))
# log("coalesceing")
# final_df = final_df.coalesce(int(final_df.rdd.getNumPartitions() / 2))

log("final_df.rdd.getNumPartitions(): ", final_df.rdd.getNumPartitions())
log('writing parquet')
final_df.write.parquet("abfss://fs1@myaccount.dfs.core.windows.net/%s/parquet-%s" % (dt.now().isoformat(), NUM_RECORDS_TO_GENERATE))
log('wrote parquet.')
log('final_df.rdd.count():', final_df.rdd.count())

output

2020-12-05T00:27:51.933995 spark.version 3.0.1
2020-12-05T00:27:51.934079 reading seed file
2020-12-05T00:27:52.713461 array_len: 377, NUM_RECORDS_TO_GENERATE: 2000002, seed_df.count(): 14
2020-12-05T00:27:52.852547 final_df.rdd.getNumPartitions():  64
2020-12-05T00:27:52.852749 writing parquet
2020-12-05T00:28:00.823663 wrote parquet.
2020-12-05T00:28:08.757957 final_df.rdd.count(): 1989806

aggregated metrics and files produced

coalesce

... same as above ...
2020-12-05T00:12:22.620791 coalesceing
2020-12-05T00:12:22.860093 final_df.rdd.getNumPartitions():  32
2020-12-05T00:12:22.860249 writing parquet
2020-12-05T00:12:31.280416 wrote parquet.
2020-12-05T00:12:39.204093 final_df.rdd.count(): 1989806

aggregated metrics and files produced

repartition

... same as above ...
2020-12-05T00:23:40.155481 repartitioning
2020-12-05T00:23:44.702251 final_df.rdd.getNumPartitions():  8
2020-12-05T00:23:44.702421 writing parquet
2020-12-05T00:23:50.478841 wrote parquet.
2020-12-05T00:23:52.174997 final_df.rdd.count(): 1989806

aggregated metrics and files produced


DAG Visualization of the stage that takes long time:
DAG Chart

PS: Ignore the slight mismatch in NUM_RECORDS_TO_GENERATE value and the actual number of records generated. It’s probably a math problem in sqrt and I don’t care if it’s off by a few millions.


I posted this here before and there was no solution ("answer" is by me and it’s not a solution). Just discovered this data science community so posting here.


Get this bounty!!!

#StackBounty: #python-3.x #docker #apache-spark Connection refused to remote Spark Master running on a Docker Container

Bounty: 50

I want to connect to a remote Spark Master running on Docker, via Python in my local machine:

spark = SparkSession 
    .builder .
    .master('spark://ip:7077') 
    .appName('spark-yarn') 
    .getOrCreate()

I get Connection Refused error from running the code.

Running telnet ip 7077 in my terminal gives the error:

telnet: Unable to connect to remote host: Connection refused

This is confusing, because the port on the server itself is open and the server is accepting connection from port 7077.

Running docker container ls on the server shows:

CONTAINER ID        IMAGE                                                    COMMAND                  CREATED             STATUS                  PORTS                                            NAMES
9086cf2f26dc        bde2020/spark-master:3.0.1-hadoop3.2                     "/bin/bash /master.sh"   2 weeks ago         Up 2 weeks              6066/tcp, 8080/tcp, 0.0.0.0:7077->7077/tcp       spark_spark-master.1.qyie2bq52hbrfg2ttioz6ljwq
5133adc223ef        bde2020/spark-worker:3.0.1-hadoop3.2                     "/bin/bash /worker.sh"   2 weeks ago         Up 2 weeks              8081/tcp                                         spark_spark-worker.ylnj52bj78as9hxr6zdo1lgo3.kwzys14lm3uid0qclyv0jn95o
da2841b1d757        bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8       "/entrypoint.sh /run…"   2 months ago        Up 2 months (healthy)   8042/tcp                                         hadoop_nodemanager.ylnj52bj78as9hxr6zdo1lgo3.o9gznaa9u57wuyf21fl9ya4hi
49a3cbb8073a        bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8   "/entrypoint.sh /run…"   2 months ago        Up 2 months             8088/tcp                                         hadoop_resourcemanager.1.7kwgmhxz74brj6xs218k81ptk
10b22205a879        bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8     "/entrypoint.sh /run…"   2 months ago        Up 2 months (healthy)   8188/tcp                                         hadoop_historyserver.1.p3c3ouxmayxt4rvhrjlq7ti4t
775209433ea8        bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8          "/entrypoint.sh /run…"   2 months ago        Up 2 months (healthy)   0.0.0.0:9000->9000/tcp, 0.0.0.0:9870->9870/tcp   hadoop_namenode.1.bbt0n4ne76ddwqtmejlsf590m
5d14d16020e5        bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8          "/entrypoint.sh /run…"   2 months ago        Up 2 months (healthy)   9864/tcp                                         hadoop_datanode.ylnj52bj78as9hxr6zdo1lgo3.e9drmfbdqicux6ltkk9gv2uh5
83f7b3290995        traefik:v2.2                                             "/entrypoint.sh --ap…"   2 months ago        Up 2 months             80/tcp                                           traefik_traefik.1.ha8o6dc3ewtmppkn4pauugkj

The docker-compose.yml file for spark is:

version: '3.6'
services:
  spark-master:
    image: bde2020/spark-master:3.0.1-hadoop3.2
    networks:
      - workbench
    ports:
      - target: 7077
        published: 7077
        mode: host
    deploy:
      restart_policy:
        condition: on-failure
      placement:
        constraints:
          - node.hostname == johnsnow
      labels:
        - "traefik.enable=true"
        - "traefik.docker.network=workbench"
        - "traefik.http.services.spark-master.loadbalancer.server.port=8080"
    env_file:
      - ./hadoop.env
    environment:
    - INIT_DAEMON_STEP=setup_spark
    - "constraint:node==spark-master"

  spark-worker:
    image: bde2020/spark-worker:3.0.1-hadoop3.2
    networks:
      - workbench
    environment:
      - SPARK_MASTER_URL=spark://spark_spark-master:7077
    deploy:
      mode: global
      restart_policy:
        condition: on-failure
      labels:
        - "traefik.enable=true"
        - "traefik.docker.network=workbench"
        - "traefik.http.services.spark-worker.loadbalancer.server.port=8081"
    env_file:
      - ./hadoop.env
    environment:
    - INIT_DAEMON_STEP=setup_spark
    - "constraint:node==spark-worker"
networks:
  workbench:
    external: true

Why is this error occurring?


Get this bounty!!!

#StackBounty: #scala #apache-spark Group events close in time into sessions and assign unique session IDs

Bounty: 50

The following is a trimmed-down example of my actual code, but it suffices to show the algorithmic problem I’m trying to solve.

Given is a DataFrame with events, each with a user ID and a timestamp.

val events = Seq(
  ("1001", 1),
  ("1001", 2),
  ("1001", 3),
  ("1001", 5),
  ("1001", 6),
  ("1002", 1),
  ("1002", 3),
).toDF("user_id", "timestamp")

events.orderBy($"user_id", $"timestamp").show
+-------+---------+
|user_id|timestamp|
+-------+---------+
|   1001|        1|
|   1001|        2|
|   1001|        3|
|   1001|        5|
|   1001|        6|
|   1002|        1|
|   1002|        3|
+-------+---------+

My goal is to group events from the same user together into one session if the timestamp gap is not above some threshold (1 in this example).

In my solution, I determine the given sessions by looking for timestamp gaps and then applying monotonically_increasing_id.

val sessions = {
  events
    .withColumn(
      "timestamp_gap_before",
      $"timestamp" - lag($"timestamp", 1).over(Window.partitionBy($"user_id").orderBy($"timestamp"))
    )
    .withColumn(
      "timestamp_gap_after",
      lead($"timestamp", 1).over(Window.partitionBy($"user_id").orderBy($"timestamp")) - $"timestamp"
    )
    .withColumn(
      "is_session_start",
      $"timestamp_gap_before".isNull || $"timestamp_gap_before" > lit(1)
    )
    .withColumn(
      "is_session_end",
      $"timestamp_gap_after".isNull || $"timestamp_gap_after" > lit(1)
    )
    .filter($"is_session_start" || $"is_session_end")
    .withColumn(
      "min_timestamp",
      least($"timestamp", lead($"timestamp", 1).over(Window.partitionBy($"user_id").orderBy($"timestamp")))
    )
    .withColumn(
      "max_timestamp",
      greatest($"timestamp", lead($"timestamp", 1).over(Window.partitionBy($"user_id").orderBy($"timestamp")))
    )
    .filter($"is_session_start")
    .select(
      $"user_id" as "session_user_id",
      $"min_timestamp" as "session_start_timestamp",
      // Special handling for sessions with only one event.
      when($"is_session_end", $"min_timestamp").otherwise($"max_timestamp") as "session_end_timestamp",
    )
    .orderBy($"user_id", $"session_start_timestamp")
    .withColumn("session_id", monotonically_increasing_id)
}

sessions.orderBy($"session_user_id", $"timestamp").show
+---------------+-----------------------+---------------------+-----------+
|session_user_id|session_start_timestamp|session_end_timestamp| session_id|
+---------------+-----------------------+---------------------+-----------+
|           1001|                      1|                    3|          0|
|           1001|                      5|                    6| 8589934592|
|           1002|                      1|                    1|17179869184|
|           1002|                      3|                    3|25769803776|
+---------------+-----------------------+---------------------+-----------+

Then, I just need to join these session IDs onto the events:

val assignedEvents = {
  events
    .join(sessions,
      $"session_user_id" === $"user_id" &&
        $"timestamp" >= $"session_start_timestamp" &&
        $"timestamp" <= $"session_end_timestamp")
    .drop("session_user_id", "session_start_timestamp", "session_end_timestamp")
}


assignedEvents.orderBy($"user_id", $"timestamp").show
+-------+---------+-----------+
|user_id|timestamp| session_id|
+-------+---------+-----------+
|   1001|        1|          0|
|   1001|        2|          0|
|   1001|        3|          0|
|   1001|        5| 8589934592|
|   1001|        6| 8589934592|
|   1002|        1|17179869184|
|   1002|        3|25769803776|
+-------+---------+-----------+

It works, but it feels somewhat clumsy. Especially the definition of sessions seems superfluous complex to me.

Does anybody know a simpler solution? 🙂


Get this bounty!!!

#StackBounty: #apache-spark #pyspark PySpark data skewness with Window Functions

Bounty: 50

I have a huge PySpark dataframe and I’m doing a series of Window functions over partitions defined by my key.

The issue with the key is, my partitions gets skewed by this and results in Event Timeline that looks something like this,

enter image description here

I know that I can use salting technique to solve this issue when I’m doing a join. But how can I solve this issue when I’m using Window functions?

I’m using functions like lag, lead etc in the Window functions. I can’t do the process with salted key, because I’ll get wrong results.

How to solve skewness in this case?

I’m looking for a dynamic way of repartitioning my dataframe without skewness.


Get this bounty!!!

#StackBounty: #apache-spark #hive #apache-spark-sql #kerberos How to use Apache Spark to query Hive table with Kerberos?

Bounty: 50

I am attempting to use Scala with Apache Spark locally to query Hive table which is secured with Kerberos. I have no issues connecting and querying the data programmatically without Spark. However, the problem comes when I try to connect and query in Spark.

My code when run locally without spark:

Class.forName("org.apache.hive.jdbc.HiveDriver")

    System.setProperty("kerberos.keytab", keytab)
    System.setProperty("kerberos.principal", keytab)
    System.setProperty("java.security.krb5.conf", krb5.conf)
    System.setProperty("java.security.auth.login.config", jaas.conf)

    val conf = new Configuration
    conf.set("hadoop.security.authentication", "Kerberos")

    UserGroupInformation.setConfiguration(conf)
    UserGroupInformation.createProxyUser("user", UserGroupInformation.getLoginUser)
    UserGroupInformation.loginUserFromKeytab(user, keytab)
    UserGroupInformation.getLoginUser.checkTGTAndReloginFromKeytab()

    if (UserGroupInformation.isLoginKeytabBased) {
      UserGroupInformation.getLoginUser.reloginFromKeytab()
    }
    else if (UserGroupInformation.isLoginTicketBased) UserGroupInformation.getLoginUser.reloginFromTicketCache()

    val con = DriverManager.getConnection("jdbc:hive://hdpe-hive.company.com:10000", user, password)
    val ps = con.prepareStatement("select * from table limit 5").executeQuery();

Does anyone know how I could include the keytab, krb5.conf and jaas.conf into my Spark initialization function so that I am able to authenticate with Kerberos to get the TGT?

My Spark initialization function:

conf = new SparkConf().setAppName("mediumData")
      .setMaster(numCores)
      .set("spark.driver.host", "localhost")
      .set("spark.ui.enabled","true") //enable spark UI
      .set("spark.sql.shuffle.partitions",defaultPartitions)
    sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()

I do not have files such as hive-site.xml, core-site.xml.

Thank you!


Get this bounty!!!