#StackBounty: #scala #apache-spark #rdd Finding Maximum in Key Value RDD

Bounty: 100

I have a key-value RDD of the form :

(Some(23661587),
CompactBuffer(Posting(2,23661643,Some(23661587),0,None), 
              Posting(2,23661682,Some(23661587),0,None)))

Here Some(23661587) is the key and data inside CompactBuffer is the value. I want to select the Posting type with maximum value for a particular attribute for each key.

How can I do that? I have limited experience in Scala and Spark.
Thanks


Get this bounty!!!

#StackBounty: #image #scala #jvm How to create a black and white image of arbitrary size

Bounty: 50

I want to create an arbitrary large image consisting solely of black and white pixel. I am currently using BufferedImage. This, however, hits a hard limit at 65500 Pixel.

Exception in thread "main" javax.imageio.IIOException: Maximum supported image dimension is 65500 pixels
    at java.desktop/com.sun.imageio.plugins.jpeg.JPEGImageWriter.writeImage(Native Method)
    at java.desktop/com.sun.imageio.plugins.jpeg.JPEGImageWriter.writeOnThread(JPEGImageWriter.java:1007)
    at java.desktop/com.sun.imageio.plugins.jpeg.JPEGImageWriter.write(JPEGImageWriter.java:371)
    at java.desktop/javax.imageio.ImageWriter.write(ImageWriter.java:613)
    at java.desktop/javax.imageio.ImageIO.doWrite(ImageIO.java:1628)
    at java.desktop/javax.imageio.ImageIO.write(ImageIO.java:1554)

How can I create an image of arbitrary size?

Bonus points: As the image is only black and white an efficient data format would be great.


Get this bounty!!!

#StackBounty: #scala Mobile number validation issue with google i18 package

Bounty: 50

I am using google i18 package

package com.google.i18n.phonenumbers;

 private val mobileRule = genValidation("mobile", _.notificationChannels.find(_.channel == "mobile").forall { channel =>
      channel.channelId.exists(number => !numberUtil.isValidNumber(numberUtil.parse(number, "GB")))
    }, "the mobile number entered is not valid or has not been entered")
    
    private val landlineRule = genValidation("landline", _.notificationChannels.find(_.channel == "landline").exists { channel =>
      channel.channelId.exists(number => !numberUtil.isValidNumber(numberUtil.parse(number, "GB")))
    }, "the landline number entered is not valid")
    

With this i18 package if i pass a 17 digit mobile number then it accept otherwise it gives me 500 error can somebody help me on that so that if i pass greater than digit 10 it gives me some type of error like mobile number is incorrect.

With the above code If I pass less than 10 then gives me error that is mobile number is incorrect which is fine but I want if i pass greater then 10 also in that case it gives me 500 error.

Please help if anyone have idea.


Get this bounty!!!

#StackBounty: #sql #regex #scala #join #apache-spark Spark Scala: SQL rlike vs Custom UDF

Bounty: 50

I’ve a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using “spark sql rlike” method as below and it was able to hold the load until incoming record counts were less than 50K

PS: The regular expression reference data is a broadcasted dataset.

dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")

Then I wrote a custom UDF to transform them using Scala native regex search as below,

  1. Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = {
        regexDataset.value
            .select( "col_1", "regex_column")
            .collect
            .map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r))
    }

Implementation of Regex matching UDF,

    def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = {
        udf((input_column: String) => {
            for {
                text <- Option(input_column)
                matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true)
                if matches.nonEmpty
            } yield matches.map(x => x._1).min
        }, IntegerType)
    }

Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,

dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column"))
.join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")

But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count increases above 1M. Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here or if there’s a better API for Scala regex match than what I’ve written here? or any suggestions to do this efficiently would be very helpful.


Get this bounty!!!

#StackBounty: #scala #apache-spark #apache-kafka #spark-structured-streaming Spark Streaming – Join on multiple kafka stream operation …

Bounty: 50

I have 3 kafka streams having 600k+ records each, spark streaming takes more than 10 mins to process simple joins between streams.

Spark Cluster config:

Spark Master UI

This is how i’m reading kafka streams to tempviews in spark(scala)

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

I join 3 TABLES using spark spark sql

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

Execution of Job:

Job UI

Am i missing out some configuration on spark that i’ve to look into?


Get this bounty!!!

#StackBounty: #scala A simple data class based on an existing collection (Scala)

Bounty: 50

I’m fairly new to Scala and I would like to learn it properly. My current task is to create a type to represent a heap, as seen in interpreters: a mapping from addresses in memory to values stored there.

A heap of that kind is very much like Map, but I would like to hide implementation details and only expose the interface consisting of a few methods, which in pseudo code would be:

* update :: address value -> Heap
* free :: address -> Heap
* alloc :: value -> (Heap, Address)
* addresses :: () -> Set[Address]
* lookup :: address -> [Value]

Here is what I came up with:

trait Heap[T] {

  def update(address: Address, value: T): Heap[T]

  def free(address: Address): Heap[T]

  def alloc(value: T): (Heap[T], Address)

  def addresses(): Set[Address]

  def lookup(address: Address): Option[T]
}

private case class HeapImpl[T](map: Map[Address, T]) extends Heap[T] {
  override def update(address: Address, value: T): Heap[T] = HeapImpl[T](map.updated(address, value))

  override def free(address: Address): Heap[T] = HeapImpl[T](map.removed(address))

  override def alloc(value: T): (Heap[T], Address) = {
    val nextFreeAddress = addresses().maxOption.getOrElse(0) + 1
    (HeapImpl(map.updated(nextFreeAddress, value)), nextFreeAddress)
  }

  override def addresses(): Set[Address] = map.keys.toSet

  override def lookup(address: Address): Option[T] = map.get(address)
}

object Heap {
  def apply[T](): Heap[T] = HeapImpl(Map())
}

I would like to know if this is proper idiomatic Scala or should I approach it differently.


Get this bounty!!!

#StackBounty: #java #scala #jmeter #jsr223 #scriptengine Scala JSR223 script using JMeter/Java context

Bounty: 50

Scala JSR223 script support since 2.11

e.eval("""s"a is $a, s is $s"""")

I added Scala 2.13 jars and tried to execute script, it can display constants in response

But I can’t add JMeter’s bind variables as log, I tried with:

log.info(a);
$log.info(a);

Or can’t print values to log, tried also

var a:Int =  10
println(a)

JMeter bindings code:

 Bindings bindings = engine.createBindings();
 final Logger logger = LoggerFactory.getLogger(JSR223_INIT_FILE);
 bindings.put("log", logger); // $NON-NLS-1$ (this name is fixed)       
 engine.eval(reader, bindings);

Tried also using bindings but it isn’t in context

bindings.get("log").info("aa");

Exception

ERROR o.a.j.p.j.s.JSR223Sampler: Problem in JSR223 script JSR223 Sampler, message: javax.script.ScriptException: not found: value bindings

How can I submit Scala JSR223 script using JMeter/Java bindings variables?


Get this bounty!!!

#StackBounty: #scala #amazon-web-services #apache-spark #hadoop #amazon-s3 Can't connect from Spark to S3 – AmazonS3Exception Statu…

Bounty: 50

I am trying to connect from Spark (running on my PC) to my S3 bucket:

 val spark = SparkSession
      .builder
      .appName("S3Client")
      .config("spark.master", "local")
      .getOrCreate()

val sc = spark.sparkContext;
    sc.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY)
    sc.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY)
    val txtFile = sc.textFile("s3a://bucket-name/folder/file.txt")
    val contents = txtFile.collect();

But getting the following exception:

Exception in thread “main”
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400,
AWS Service: Amazon S3, AWS Request ID: 07A7BDC9135BCC84, AWS Error
Code: null, AWS Error Message: Bad Request, S3 Extended Request ID:
6ly2vhZ2mAJdQl5UZ/QUdilFFN1hKhRzirw6h441oosGz+PLIvLW2fXsZ9xmd8cuBrNHCdh8UPE=

I have seen this question but it didn’t help me.

Edit:

As Zack suggested, I added:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")

But I still get the same exception.


Get this bounty!!!

#StackBounty: #scala #apache-spark #user-defined-functions How to add null columns to complex array struct in Spark with a udf

Bounty: 50

I am trying to add null columns to embebed array[struct] column, by this way I will able to transform a similar complex column:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

The goal is add to Element.Additional an extra field called extra2, for this reason I map this field with a UDF but it fails because:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

If I print schema for ‘Elements’ field shows:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

And I am trying to convert into this schema:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)


Get this bounty!!!