#StackBounty: #python #apache-spark #spark-streaming Read output output of a script with spark streaming

Bounty: 50

I have a script similar to this one

import json 
def line_generator():
    d = dict({1:1, 2:2, 3:3})
    while True:
        yield json.dumps(d)

it = line_generator()
for l in it:
    print(l)

which output values to stdout. I would like to “catch” those values with the spark Streaming api to store them in parquet files, and apply some inference code which is written is HiveQL. I am not a Scala person :/ so, if possible, I would prefer to have a solution in PySpark but am happy with any piece of advice.

I know it is possible to read stream of data coming from a Kafka stream for example, is there a similar way to read data sent to stdout or data that are continuously writtten to a file ?
Thanks in advance for your help


Get this bounty!!!

#StackBounty: #apache-spark #spark-streaming #spark-streaming-kafka Dispatch and initiate Spark Jobs on Kafka message

Bounty: 50

I have an external data source which sends the data thru Kafka.

As a fact this is not a real data, but links to the data.

"type": "job_type_1"
"urls": [
  "://some_file"
  "://some_file"
]

There is a single topic, but it contains type field basing on which I need to execute one of jobs.

The data is not continuous, but more like jobs – it contains a set of data which should be processed in a single batch. The next topic is independent. All topics of the same type should be processed synchronously.

Options:

  1. Use Spark Streaming.

    It does not look this is an appropriate solution for my scenario. And there is no built-in ability to consider the value not as a data, but as a list of paths

  2. Create an intermediate service, which will dispatch requests and start a concrete job. In this case what is the best approach to pass 20Kb+ data to the job as spark-submit may not take so much as an argument

  3. Create a long running spark app, which will contain pure Kafka consumer, and on each message it will create Spark Session and execute the job.

Not sure will this work properly, how to stop it, etc.

  1. ???


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #spark-streaming pyspark streaming how to set ConnectionPool

Bounty: 50

I have a task, I want read data from kafka and use spark spark streaming to process it, and I want send data to Hbase.

In the spark official document, I found:

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

But I coudln’t find any clue to set up ConnectionPool to Hbase using pyspark.

I also don’t understand how streaming is working?
In the code there is foreachPartition, I want to be clear those partitions is on the same spark container or not?

Dose all variables in the closure is reset for all each rdd’s each partitions ?

Is there way I can set variable at the worker level?

Dose the globals() is the worker level ? Or it is the cluster level?


Get this bounty!!!

#StackBounty: #scala #amazon-web-services #apache-spark #hadoop #amazon-s3 Can't connect from Spark to S3 – AmazonS3Exception Statu…

Bounty: 50

I am trying to connect from Spark (running on my PC) to my S3 bucket:

 val spark = SparkSession
      .builder
      .appName("S3Client")
      .config("spark.master", "local")
      .getOrCreate()

val sc = spark.sparkContext;
    sc.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY)
    sc.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY)
    val txtFile = sc.textFile("s3a://bucket-name/folder/file.txt")
    val contents = txtFile.collect();

But getting the following exception:

Exception in thread “main”
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400,
AWS Service: Amazon S3, AWS Request ID: 07A7BDC9135BCC84, AWS Error
Code: null, AWS Error Message: Bad Request, S3 Extended Request ID:
6ly2vhZ2mAJdQl5UZ/QUdilFFN1hKhRzirw6h441oosGz+PLIvLW2fXsZ9xmd8cuBrNHCdh8UPE=

I have seen this question but it didn’t help me.

Edit:

As Zack suggested, I added:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")

But I still get the same exception.


Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql #bigdata #spark-streaming Best approach to check if Spark streaming jobs are hanging

Bounty: 400

I have Spark streaming application which basically get a trigger message from Kafka which kick starts the batch processing which could potentially take up to 2 hours.

There were incidents where some of the jobs were hanging indefinitely and didn’t got completed with in the usual time and currently there is no way we could figure out the jobs status without checking the Spark UI manually. I want to have a way where the currently running spark jobs are hanging or not. So basically if it’s hanging for more than 30 minutes I want to notify the users so they can take an action. What all options do I have?

I see I can use metrics from driver and executors. If I were to choose the most important one, it would be the last received batch records. When StreamingMetrics.streaming.lastReceivedBatch_records == 0 it probably means that Spark streaming job has been stopped or failed.

But in my scenario i will receive only 1 streaming trigger event and then it will kick start the processing which may take up to 2 hours so I won’t be able to rely on the records received.

Is there a better way? TIA


Get this bounty!!!

#StackBounty: #scala #apache-spark #user-defined-functions How to add null columns to complex array struct in Spark with a udf

Bounty: 50

I am trying to add null columns to embebed array[struct] column, by this way I will able to transform a similar complex column:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

The goal is add to Element.Additional an extra field called extra2, for this reason I map this field with a UDF but it fails because:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

If I print schema for ‘Elements’ field shows:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

And I am trying to convert into this schema:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)


Get this bounty!!!

#StackBounty: #apache-spark #yarn Does reducing the number of executor-cores consume less executor-memory?

Bounty: 50

My Spark job failed with the YARN error Container killed by YARN for exceeding memory limits 10.0 GB of 10 GB physical memory used.

Intuitively, I decreased the number of cores from 5 to 1 and the job ran successfully.

I did not increase the executor-memory because 10g was the max for my YARN cluster.

I just wanted to confirm if my intuition. Does reducing the number of executor-cores consume less executor-memory? If so, why?


Get this bounty!!!

#StackBounty: #amazon-web-services #apache-spark #pyspark #aws-glue AWS Glue pushdown predicate not working properly

Bounty: 500

I’m trying to optimize my Glue/PySpark job by using push down predicates.

start = date(2019, 2, 13) 
end = date(2019, 2, 27) 
print(">>> Generate data frame for ", start, " to ", end, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=end)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s'
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

However it appears, that Glue still attempts to read data outside the specified date range?

INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-01/part-00045-6cdebbb1-562c-43fa-915d-93b125aeee61.c000.snappy.parquet' for reading
INFO FileScanRDD: Reading File path: s3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet, range: 0-11797922, partition values: [12191,17965]
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet' for reading
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.

Notice the querydatetime=2019-03-01 and querydatetime=2019-03-10 its outside the specified range of 2019-02-13 - 2019-02-27. Is that why there’s the next line “aborting HTTP connection” tho? It goes on to say “This is likely an error and may result in sub-optimal behavior” is something wrong?

I wonder if the problem is because it does not support BETWEEN inside the predicate or IN?


Get this bounty!!!

#StackBounty: #java #apache-spark #databricks How to detect Databricks environment programmatically

Bounty: 50

I’m writing a spark job that needs to be runnable locally as well as on Databricks.

The code has to be slightly different in each environment (file paths) so I’m trying to find a way to detect if the job is running in Databricks. The best way I have found so far was to look for a “dbfs” directory in the root dir and if it’s there then assume it’s running on Databricks. This doesn’t feel like the right solution. Does anyone have a better idea?


Get this bounty!!!

#StackBounty: #python #apache-spark #pickle #dill Using python lime as a udf on spark

Bounty: 50

I’m looking to use lime’s explainer within a udf on pyspark. I’ve previously trained the tabular explainer, and stored is as a dill model as suggested in link

loaded_explainer = dill.load(open('location_to_explainer','rb'))

def lime_explainer(*cols):
    selected_cols = np.array([value for value in cols])
    exp = loaded_explainer.explain_instance(selected_cols, loaded_model.predict_proba, num_features = 10)
    mapping = exp.as_map()[1]

    return str(mapping)

This however takes a lot of time, as it appears a lot of the computation happens on the driver. I’ve then been trying to use spark broadcast to broadcast the explainer to the executors.

broadcasted_explainer= sc.broadcast(loaded_explainer)

def lime_explainer(*col):
    selected_cols = np.array([value for value in cols])
    exp = broadcasted_explainer.value.explain_instance(selected_cols, loaded_model.predict_proba, num_features = 10)
    mapping = exp.as_map()[1]

    return str(mapping)        

However, I run into a pickling error, on broadcast.

PicklingError: Can’t pickle at 0x7f69fd5680d0>:
attribute lookup on lime.discretize failed

Can anybody help with this? Is there something like dill that we can use instead of the cloudpickler used in spark?


Get this bounty!!!