#StackBounty: #scala #apache-spark #apache-kafka #spark-structured-streaming Spark Streaming – Join on multiple kafka stream operation

I have 3 kafka streams having 600k+ records each, spark streaming takes more than 10 mins to process simple joins between streams.

Spark Cluster config:

Spark Master UI

This is how i’m reading kafka streams to tempviews in spark(scala)

.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")

I join 3 TABLES using spark spark sql

select COL1, COL2 from TABLE1   

Execution of Job:

Job UI

Am i missing out some configuration on spark that i’ve to look into?

#StackBounty: #scala A simple data class based on an existing collection (Scala)

I’m fairly new to Scala and I would like to learn it properly. My current task is to create a type to represent a heap, as seen in interpreters: a mapping from addresses in memory to values stored there.

A heap of that kind is very much like Map, but I would like to hide implementation details and only expose the interface consisting of a few methods, which in pseudo code would be:

* update :: address value -> Heap
* free :: address -> Heap
* alloc :: value -> (Heap, Address)
* addresses :: () -> Set[Address]
* lookup :: address -> [Value]

Here is what I came up with:

trait Heap[T] {

  def update(address: Address, value: T): Heap[T]

  def free(address: Address): Heap[T]

  def alloc(value: T): (Heap[T], Address)

  def addresses(): Set[Address]

  def lookup(address: Address): Option[T]

private case class HeapImpl[T](map: Map[Address, T]) extends Heap[T] {
  override def update(address: Address, value: T): Heap[T] = HeapImpl[T](map.updated(address, value))

  override def free(address: Address): Heap[T] = HeapImpl[T](map.removed(address))

  override def alloc(value: T): (Heap[T], Address) = {
    val nextFreeAddress = addresses().maxOption.getOrElse(0) + 1
    (HeapImpl(map.updated(nextFreeAddress, value)), nextFreeAddress)

  override def addresses(): Set[Address] = map.keys.toSet

  override def lookup(address: Address): Option[T] = map.get(address)

object Heap {
  def apply[T](): Heap[T] = HeapImpl(Map())

I would like to know if this is proper idiomatic Scala or should I approach it differently.

#StackBounty: #java #scala #jmeter #jsr223 #scriptengine Scala JSR223 script using JMeter/Java context

Scala JSR223 script support since 2.11

e.eval("""s"a is $a, s is $s"""")

I added Scala 2.13 jars and tried to execute script, it can display constants in response

But I can’t add JMeter’s bind variables as log, I tried with:


Or can’t print values to log, tried also

var a:Int =  10

JMeter bindings code:

 Bindings bindings = engine.createBindings();
 final Logger logger = LoggerFactory.getLogger(JSR223_INIT_FILE);
 bindings.put("log", logger); // $NON-NLS-1$ (this name is fixed)       
 engine.eval(reader, bindings);

Tried also using bindings but it isn’t in context



ERROR o.a.j.p.j.s.JSR223Sampler: Problem in JSR223 script JSR223 Sampler, message: javax.script.ScriptException: not found: value bindings

How can I submit Scala JSR223 script using JMeter/Java bindings variables?

#StackBounty: #scala #amazon-web-services #apache-spark #hadoop #amazon-s3 Can't connect from Spark to S3 – AmazonS3Exception Status Code: 400

I am trying to connect from Spark (running on my PC) to my S3 bucket:

 val spark = SparkSession
      .config("spark.master", "local")

val sc = spark.sparkContext;
    sc.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY)
    sc.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY)
    val txtFile = sc.textFile("s3a://bucket-name/folder/file.txt")
    val contents = txtFile.collect();

But getting the following exception:

Exception in thread “main”
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400,
AWS Service: Amazon S3, AWS Request ID: 07A7BDC9135BCC84, AWS Error
Code: null, AWS Error Message: Bad Request, S3 Extended Request ID:

I have seen this question but it didn’t help me.


As Zack suggested, I added:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")

But I still get the same exception.

#StackBounty: #scala #apache-spark #user-defined-functions How to add null columns to complex array struct in Spark with a udf

I am trying to add null columns to embebed array[struct] column, by this way I will able to transform a similar complex column:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

The goal is add to Element.Additional an extra field called extra2, for this reason I map this field with a UDF but it fails because:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

If I print schema for ‘Elements’ field shows:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

And I am trying to convert into this schema:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)

#StackBounty: #scala #function-composition #finagle Is it possible to have a generic logging filter in finagle that can be "inserted anywhere" in a chain?

In our code we create many “finagle pipelines” like so:

val f1 = new Filter[A,B,C,D](...)
val f2 = new SimpleFilter[C,D](...)
val f3 = new Filter[C,D,E,F](...)
val s = new Service[E,F](...)

val pipeline: Service[A,B] = f1 andThen f2 andThen f3 andThen s

I would now like the ability to “insert” loggers anywhere in such a chain. The logger would only log the fact that a request came in and a response was received. Something like this:

class LoggerFilter[Req, Resp](customLog: String) extends SimpleFilter[Req, Resp] with LazyLogging{
  override def apply(request: Req, service: Service[Req, Resp]): Future[Resp] = {
    logger.info(s"$customLog => Request: ${request.getClass.getName} -> ${service.toString}")
    service(request).map{resp =>
      logger.info(s"$customLog => Response: ${resp.getClass.getName} -> ${request.getClass.getName}")

With this approach we have to keep declaring multiple loggers so that the types can align correctly and then we insert at the “right location”.

val logger1 = new LoggerFilter[A,B]("A->B Logger")
val logger2 = new LoggerFilter[C,D]("C->D Logger")
val logger3 = new LoggerFilter[E,F]("E->F Logger")

val pipeline = logger1 andThen f1 andThen f2 andThen logger2 andThen f3 andThen logger3 andThen s

Is there a way this can be avoided? Is it possible to just have a single logger that can infer the Req/Resp types automagically and be “insertable anywhere” in the chain?


val logger = getTypeAgnosticLogger // What's the implementation?

val pipeline = logger andThen f1 andThen f2 andThen logger andThen f3 andThen logger andThen s

// Is this possible - params for logger to print?
val pipeline = logger("f1") andThen f1 andThen f2 andThen logger("f3") andThen f3 andThen logger("s") andThen s

#StackBounty: #reinventing-the-wheel #scala #generics #dsl Typeclass-oriented example project with implicit classes

The following code will only be an example of a project that due to (nested) higher kinds heavily relies on typeclasses for its DSL.

When reviewing the code keep in mind that this is an example, so I am not relying on libraries like cats even if they might already provide solutions for what I am doing.

With that aside I have some explicit questions about aspects of my code apart from an overall review:

  • What’s the correct package structuring e.g. /algebra, /syntax, /ops, /dsl, /implicits, … and where in that structure do simple case classes (like M or N) go, where typeclasses (like Invertable), where typeclass-instances (like mnInvertable), where implicit classes (like InvertableOps)?
  • Is my use of an implicit class good here, or is there a better way to define .invert on types like M[N[A]]?
  • Should the implicit parameter in the implicit class be moved to def invert instead?
  • What’s the naming convention for typeclasses (I used ...able), typeclass instances (I used type + typeclass) and implicit classes (I used typeclass + Ops)?
  • Bonus: Is it possible to define Invertable as context bounds instead of an implicit parameter? (I think not, but who knows!)
// src/main/scala/myproject/algebra/M.scala
case class M[A](value: A)

// src/main/scala/myproject/algebra/N.scala
case class N[A](value: A)

// src/main/scala/myproject/syntax/Invertable.scala
trait Invertable[F[_], G[_]] {
  def invert[A](fga: F[G[A]]): G[F[A]]

// src/main/scala/myproject/implicits/package.scala
implicit val mnInvertable: Invertable[M, N] = new Invertable[M, N] {
  def invert[A](fga: M[N[A]]): N[M[A]] = N(M(fga.value.value))

implicit class InvertableOps[F[_], G[_], A](fga: F[G[A]])(implicit i: Invertable[F, G]) {
  def invert = i.invert(fga)

// Somewhere in the project
M(N(1)).invert // It works!

#StackBounty: #scala #join #activerecord #group-by Write join query with groupby in Scala ActiveRecord

I am trying to write a specific query in scala Active record. But it always returns nothing. I have read the wiki on the github page but it does not contain a lot of info on it. The query I am trying to write is

SELECT e.name, e.id, COUNT(pt.pass_id) as pass_count, e.start_date, e.total_passes_to_offer
FROM events e inner join passes p on e.id = p.event_id inner join pass_tickets pt on p.id = pt.pass_id where e.partner_id = 198 group by e.name, e.id

What I have tried is

Event.joins[Pass, PassTicket](
                (event, pass, passTicket) => (event.id === pass.eventId, pass.id === passTicket.passId)
                (event, _, _) => event.partnerId === partnerId
                (event, pass, _) => (event.name, event.id, PassTicket.where(_.passId === pass.id).count, event.startDate, event.totalPassesToOffer)
            ).groupBy( data => data._2)

But first, the return type becomes a map, not a list. And second when executed, it doesnt return anything even though the data exists and the SQL executes fine.

#StackBounty: #scala #apache-spark #rdd #k-means Spark iterative Kmeans not get expected results?

In my homework I am asked to write an naive implementation of Kmeans in Spark:

import breeze.linalg.{ Vector, DenseVector, squaredDistance }
import scala.math 
def parse(line: String): Vector[Double] = {
    DenseVector(line.split(' ').map(_.toDouble))
def closest_assign(p: Vector[Double], centres: Array[Vector[Double]]): Int = {
    var bestIndex = 1
    var closest = Double.PositiveInfinity

    for (i <- 0 until centres.length) {
      val tempDist = squaredDistance(p, centres(i))

      if (tempDist < closest) {
        closest = tempDist
        bestIndex = i


val fileroot:String="/FileStore/tables/"
val file=sc.textFile(fileroot+"data.txt")
           .map(parse _)
val c1=sc.textFile(fileroot+"c1.txt")
         .map(parse _)

val c2=sc.textFile(fileroot+"c2.txt")
         .map(parse _)
val K=10
val MAX_ITER=20
var kPoints=c2

for(i<-0 until MAX_ITER){
    val closest = file.map(p => (closest_assign(p, kPoints), (p, 1)))

    val pointStats = closest.reduceByKey { case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }

    val newPoints = pointStats.map { pair =>
        (pair._1, pair._2._1 * (1.0 / pair._2._2))

     for (newP <- newPoints) {
        kPoints(newP._1) = newP._2

  val tempDist = closest
    .map { x => squaredDistance(x._2._1, newPoints(x._1)) }
    .fold(0) { _ + _ }

     println(i+" time finished iteration (cost = " + tempDist + ")") 

In thoory tempDist should become smaller and smaller as the program runs but in reality it goes the other way around. Also I found c1 and c2 changes value as after the for(i<-0 until MAX_ITER) loop. But c1 and c2 should be val values! Is the way I load c1 and c2 wrong? c1 and c2 are two different initial clusters for the data.

