#StackBounty: #python #apache-spark #elasticsearch #pyspark Unable to connect to elasticsearch from pyspark but able make it from Hive

Bounty: 50

I’m using below snippet to connect and load data from Hive to elasticsearch(v 6.2) without any issues

ADD JAR file:///<>/elasticsearch-hadoop-hive-6.2.2.jar;
ADD FILE file:///<>/mycerts.jks;

CREATE EXTERNAL TABLE if not exists my_db.my_es_table
(
col1 int,
col2 string,
col3 string,
col4 timestamp,
key_id string
)
COMMENT 'data into ES'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index1/type1',
'es.index.auto.create'='true',
'es.nodes'='<vip_name>',
'es.port'='9200',
'es.net.http.auth.user'='<user>',
'es.net.http.auth.pass'='pwd',
'es.net.ssl.protocol'='SSL',
'es.net.ssl'='TRUE',
'es.net.ssl.truststore.location'='mycerts.jks',
'es.net.ssl.truststore.pass'='<pwd>',
'es.mapping.id'='key_id'
);

INSERT OVERWRITE TABLE my_db.my_es_table
SELECT
col1,
col2,
col3,
col4,
key_id
FROM my_db.stagging_data;

But, when am trying to migrate the same piece to py-spark, it is throwing exceptions

   org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI

Below is the code snippet i have tried for spark

df_delta=sqlContext.table('my_db.stagging_data')
status=df_delta.rdd.map(lambda row:(None,row.asDict())).saveAsNewAPIHadoopFile(path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",conf={'es.resource' : 'index1/type1','es.index.auto.create':'true','es.nodes':'<vip_name>','es.port':'9200','es.net.http.auth.user':'<user>','es.net.http.auth.pass':'<pwd>','es.net.ssl':'true','es.net.ssl.truststore.location':'file:///<path>/mycerts.jks','es.net.ssl.truststore.pass':'<pwd>','es.mapping.id' : 'key_id'})

I’m calling the shell using below command –

pyspark --jars <path>/elasticsearch-spark-20_2.11-6.2.2.jar --py-files <path>/mycerts.jks

Below i’m adding entire log

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI.
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadKeyStore(SSLSocketFactory.java:193)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadTrustManagers(SSLSocketFactory.java:224)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:171)
        ... 31 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1609)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1597)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1596)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1596)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1830)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1779)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1768)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
        ... 26 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:155)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cannot initialize SSL - Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI.
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:173)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.getSSLContext(SSLSocketFactory.java:158)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:127)
        at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
        at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
        at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
        at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
        at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
        at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:478)
        at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:112)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:380)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:344)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:348)
        at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:158)
        at org.elasticsearch.hadoop.rest.RestClient.getHttpNodes(RestClient.java:115)
        at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:92)
        at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:579)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:356)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1413)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)
        ... 8 more
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI.
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadKeyStore(SSLSocketFactory.java:193)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadTrustManagers(SSLSocketFactory.java:224)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:171)
        ... 31 more

I’m able to read and print the jks file after connecting to py-spark. Unable to resolve this issue. Could some one please suggest.


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #out-of-memory #databricks #azure-databricks PySpark dataframe operation causes OutOfMemoryError

Bounty: 150

I’m just starting to experiment with pyspark/spark and run into the issue that my code is not working. I cannot find the issue and the error output of spark is not very helpful. I do find sort of the same questions on stackoverflow but none with a clear answer or solution (at least not for me).

The code I’m trying to run is:

import json
from datetime import datetime, timedelta

from pyspark.sql.session import SparkSession

from parse.data_reader import read_csv
from parse.interpolate import insert_time_range, create_time_range, linear_interpolate

spark = SparkSession.builder.getOrCreate()

df = None
with open('config/data_sources.json') as sources_file:
    sources = json.load(sources_file)
    for file in sources['files']:
        with open('config/mappings/{}.json'.format(file['mapping'])) as mapping:
            df_to_append = read_csv(
                spark=spark,
                file='{}{}'.format(sources['root_path'], file['name']),
                config=json.load(mapping)
            )

        if df is None:
            df = df_to_append
        else:
            df = df.union(df_to_append)

df.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)

time_range = create_time_range(
    datetime(year=2019, month=7, day=1, hour=0),
    datetime(year=2019, month=7, day=8, hour=0),
    timedelta(seconds=3600)
)

df_with_intervals = insert_time_range(
    df=df,
    timestamp_column_name='Timestamp',
    variable_column_name='Variable',
    value_column_name='Value',
    time_range=time_range,
)
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)

Which gives the following output:

C:Usersmmun01PycharmProjectsxxxxvenvScriptspython.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/04 13:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/04 13:31:36 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 4:=======================>                                   (2 + 3) / 5]19/09/04 13:31:52 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://xxxxxx.azuredatabricks.net/?o=xxxxxx#/setting/clusters/xxxxxx/sparkUi
[Stage 5:===========>                                               (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 42, in <module>
    df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespysparksqldataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespy4jjava_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespysparksqlutils.py", line 63, in deco
    return f(*a, **kw)
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespy4jprotocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o655.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.addNewLine(TextGenerator.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:38)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1

The two functions I’m using are:

def create_time_range(start_time: datetime, end_time: datetime, step_size: timedelta) -> Iterable[datetime]:
    return [start_time + step_size * n for n in range(int((end_time - start_time) / step_size))]


def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    time_range = array([lit(ts) for ts in time_range])
    df_exploded = df 
        .drop(value_column_name) 
        .drop(timestamp_column_name) 
        .distinct() 
        .withColumn(value_column_name, lit(None)) 
        .withColumn(timestamp_column_name, explode(time_range))
    return df.union(df_exploded.select([timestamp_column_name, variable_column_name, value_column_name]))

The data_sources.json file currently contains only one csv file which is a couple of MB. What causes the OutOfMemoryException or how can I get a more detailed error report?

As suggested by niuer I changed the function insert_time_range to:

def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    time_range = array([lit(ts) for ts in time_range])
    df_exploded = df 
        .drop(value_column_name) 
        .drop(timestamp_column_name) 
        .distinct() 
        .withColumn(value_column_name, lit(None)) 
        .withColumn(timestamp_column_name, lit(time_range[0]))
    return df_exploded.select([timestamp_column_name, variable_column_name, value_column_name])

And before the .show() call I added a line print(df_with_intervals.count()) which is outputting the number 5 (as expected). But still when I try to show() the values I get the same OutOfMemoryException.

UPDATE
I’ve narrowed down the issue to the union, but still unclear why it is not working. I’ve updated the insert_time_range method according to suggestion in the comments:

def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    schema = StructType(
        [
            StructField(timestamp_column_name, TimestampType(), True),
            StructField(value_column_name, DoubleType(), True)
        ]
    )
    df_time_range = df.sql_ctx.createDataFrame(
        [(timestamp, None) for timestamp in time_range],
        schema=schema
    )
    df_time_range = df.select([variable_column_name]).distinct().crossJoin(df_time_range).select(
        [timestamp_column_name, variable_column_name, value_column_name]
    )
    df_time_range.show(n=20, truncate=False)

    return df.union(df_time_range)

which gives the following output:

C:Usersmmun01PycharmProjectsxxxxvenvScriptspython.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/09 23:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/09 23:00:30 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 44:==================================>                       (3 + 2) / 5]19/09/09 23:00:43 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
[Stage 45:===========>                                              (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
+-------------------+------------+-----+
|Timestamp          |Variable    |Value|
+-------------------+------------+-----+
|2019-06-30 22:00:00|Load % PS DG|null |
|2019-06-30 22:00:00|Power PS DG |null |
|2019-06-30 22:00:00|Power Shore |null |
|2019-06-30 22:00:00|Load % SB DG|null |
|2019-06-30 22:00:00|Power SB DG |null |
|2019-06-30 22:01:00|Load % PS DG|null |
|2019-06-30 22:01:00|Power PS DG |null |
|2019-06-30 22:01:00|Power Shore |null |
|2019-06-30 22:01:00|Load % SB DG|null |
|2019-06-30 22:01:00|Power SB DG |null |
|2019-06-30 22:02:00|Load % PS DG|null |
|2019-06-30 22:02:00|Power PS DG |null |
|2019-06-30 22:02:00|Power Shore |null |
|2019-06-30 22:02:00|Load % SB DG|null |
|2019-06-30 22:02:00|Power SB DG |null |
|2019-06-30 22:03:00|Load % PS DG|null |
|2019-06-30 22:03:00|Power PS DG |null |
|2019-06-30 22:03:00|Power Shore |null |
|2019-06-30 22:03:00|Load % SB DG|null |
|2019-06-30 22:03:00|Power SB DG |null |
+-------------------+------------+-----+
only showing top 20 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 46, in <module>
    df_with_intervals.sort([timestamp_column_name, variable_column_name]).show(n=5, truncate=False)
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespysparksqldataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespy4jjava_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespysparksqlutils.py", line 63, in deco
    return f(*a, **kw)
  File "C:Usersmmun01PycharmProjectsxxxxvenvlibsite-packagespy4jprotocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o333.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.add(TextGenerator.scala:19)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1

So the issue must be in the union method but I have no clue what the issue is?

UPDATE In my first attempts I had only one CSV file in config/data_sources.json so the df = df.union(df_to_append) line was never executed. Now I’ve added multiple CSV files in config/data_sources.json and then the union method is execute and again I get the py4j.protocol.Py4JJavaError: An error occurred while calling o2043.showString.
java.lang.OutOfMemoryError: Java heap space error but it already happens with the first union. What am I doing wrong with this method or this there a bug in the method itself?

Get this bounty!!!

#StackBounty: #python #apache-spark #spark-streaming Read output output of a script with spark streaming

Bounty: 50

I have a script similar to this one

import json 
def line_generator():
    d = dict({1:1, 2:2, 3:3})
    while True:
        yield json.dumps(d)

it = line_generator()
for l in it:
    print(l)

which output values to stdout. I would like to “catch” those values with the spark Streaming api to store them in parquet files, and apply some inference code which is written is HiveQL. I am not a Scala person :/ so, if possible, I would prefer to have a solution in PySpark but am happy with any piece of advice.

I know it is possible to read stream of data coming from a Kafka stream for example, is there a similar way to read data sent to stdout or data that are continuously writtten to a file ?
Thanks in advance for your help


Get this bounty!!!

#StackBounty: #apache-spark #spark-streaming #spark-streaming-kafka Dispatch and initiate Spark Jobs on Kafka message

Bounty: 50

I have an external data source which sends the data thru Kafka.

As a fact this is not a real data, but links to the data.

"type": "job_type_1"
"urls": [
  "://some_file"
  "://some_file"
]

There is a single topic, but it contains type field basing on which I need to execute one of jobs.

The data is not continuous, but more like jobs – it contains a set of data which should be processed in a single batch. The next topic is independent. All topics of the same type should be processed synchronously.

Options:

  1. Use Spark Streaming.

    It does not look this is an appropriate solution for my scenario. And there is no built-in ability to consider the value not as a data, but as a list of paths

  2. Create an intermediate service, which will dispatch requests and start a concrete job. In this case what is the best approach to pass 20Kb+ data to the job as spark-submit may not take so much as an argument

  3. Create a long running spark app, which will contain pure Kafka consumer, and on each message it will create Spark Session and execute the job.

Not sure will this work properly, how to stop it, etc.

  1. ???


Get this bounty!!!

#StackBounty: #apache-spark #pyspark #spark-streaming pyspark streaming how to set ConnectionPool

Bounty: 50

I have a task, I want read data from kafka and use spark spark streaming to process it, and I want send data to Hbase.

In the spark official document, I found:

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

But I coudln’t find any clue to set up ConnectionPool to Hbase using pyspark.

I also don’t understand how streaming is working?
In the code there is foreachPartition, I want to be clear those partitions is on the same spark container or not?

Dose all variables in the closure is reset for all each rdd’s each partitions ?

Is there way I can set variable at the worker level?

Dose the globals() is the worker level ? Or it is the cluster level?


Get this bounty!!!

#StackBounty: #scala #amazon-web-services #apache-spark #hadoop #amazon-s3 Can't connect from Spark to S3 – AmazonS3Exception Statu…

Bounty: 50

I am trying to connect from Spark (running on my PC) to my S3 bucket:

 val spark = SparkSession
      .builder
      .appName("S3Client")
      .config("spark.master", "local")
      .getOrCreate()

val sc = spark.sparkContext;
    sc.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY)
    sc.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY)
    val txtFile = sc.textFile("s3a://bucket-name/folder/file.txt")
    val contents = txtFile.collect();

But getting the following exception:

Exception in thread “main”
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400,
AWS Service: Amazon S3, AWS Request ID: 07A7BDC9135BCC84, AWS Error
Code: null, AWS Error Message: Bad Request, S3 Extended Request ID:
6ly2vhZ2mAJdQl5UZ/QUdilFFN1hKhRzirw6h441oosGz+PLIvLW2fXsZ9xmd8cuBrNHCdh8UPE=

I have seen this question but it didn’t help me.

Edit:

As Zack suggested, I added:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")

But I still get the same exception.


Get this bounty!!!

#StackBounty: #apache-spark #apache-spark-sql #bigdata #spark-streaming Best approach to check if Spark streaming jobs are hanging

Bounty: 400

I have Spark streaming application which basically get a trigger message from Kafka which kick starts the batch processing which could potentially take up to 2 hours.

There were incidents where some of the jobs were hanging indefinitely and didn’t got completed with in the usual time and currently there is no way we could figure out the jobs status without checking the Spark UI manually. I want to have a way where the currently running spark jobs are hanging or not. So basically if it’s hanging for more than 30 minutes I want to notify the users so they can take an action. What all options do I have?

I see I can use metrics from driver and executors. If I were to choose the most important one, it would be the last received batch records. When StreamingMetrics.streaming.lastReceivedBatch_records == 0 it probably means that Spark streaming job has been stopped or failed.

But in my scenario i will receive only 1 streaming trigger event and then it will kick start the processing which may take up to 2 hours so I won’t be able to rely on the records received.

Is there a better way? TIA


Get this bounty!!!

#StackBounty: #scala #apache-spark #user-defined-functions How to add null columns to complex array struct in Spark with a udf

Bounty: 50

I am trying to add null columns to embebed array[struct] column, by this way I will able to transform a similar complex column:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

The goal is add to Element.Additional an extra field called extra2, for this reason I map this field with a UDF but it fails because:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

If I print schema for ‘Elements’ field shows:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

And I am trying to convert into this schema:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)


Get this bounty!!!

#StackBounty: #apache-spark #yarn Does reducing the number of executor-cores consume less executor-memory?

Bounty: 50

My Spark job failed with the YARN error Container killed by YARN for exceeding memory limits 10.0 GB of 10 GB physical memory used.

Intuitively, I decreased the number of cores from 5 to 1 and the job ran successfully.

I did not increase the executor-memory because 10g was the max for my YARN cluster.

I just wanted to confirm if my intuition. Does reducing the number of executor-cores consume less executor-memory? If so, why?


Get this bounty!!!

#StackBounty: #amazon-web-services #apache-spark #pyspark #aws-glue AWS Glue pushdown predicate not working properly

Bounty: 500

I’m trying to optimize my Glue/PySpark job by using push down predicates.

start = date(2019, 2, 13) 
end = date(2019, 2, 27) 
print(">>> Generate data frame for ", start, " to ", end, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=end)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s'
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))

However it appears, that Glue still attempts to read data outside the specified date range?

INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-01/part-00045-6cdebbb1-562c-43fa-915d-93b125aeee61.c000.snappy.parquet' for reading
INFO FileScanRDD: Reading File path: s3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet, range: 0-11797922, partition values: [12191,17965]
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet' for reading
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.

Notice the querydatetime=2019-03-01 and querydatetime=2019-03-10 its outside the specified range of 2019-02-13 - 2019-02-27. Is that why there’s the next line “aborting HTTP connection” tho? It goes on to say “This is likely an error and may result in sub-optimal behavior” is something wrong?

I wonder if the problem is because it does not support BETWEEN inside the predicate or IN?


Get this bounty!!!