#StackBounty: #scala #apache-spark #apache-kafka #spark-structured-streaming Spark Streaming – Join on multiple kafka stream operation …

Bounty: 50

I have 3 kafka streams having 600k+ records each, spark streaming takes more than 10 mins to process simple joins between streams.

Spark Cluster config:

Spark Master UI

This is how i’m reading kafka streams to tempviews in spark(scala)

spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "KAFKASERVER")
.option("subscribe", TOPIC1)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest").load()
.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=SCHEMA1).as("data"))
.select($"COL1", $"COL2")
.createOrReplaceTempView("TABLE1")

I join 3 TABLES using spark spark sql

select COL1, COL2 from TABLE1   
JOIN TABLE2 ON TABLE1.PK = TABLE2.PK
JOIN TABLE3 ON TABLE2.PK = TABLE3.PK

Execution of Job:

Job UI

Am i missing out some configuration on spark that i’ve to look into?


Get this bounty!!!

#StackBounty: #scala A simple data class based on an existing collection (Scala)

Bounty: 50

I’m fairly new to Scala and I would like to learn it properly. My current task is to create a type to represent a heap, as seen in interpreters: a mapping from addresses in memory to values stored there.

A heap of that kind is very much like Map, but I would like to hide implementation details and only expose the interface consisting of a few methods, which in pseudo code would be:

* update :: address value -> Heap
* free :: address -> Heap
* alloc :: value -> (Heap, Address)
* addresses :: () -> Set[Address]
* lookup :: address -> [Value]

Here is what I came up with:

trait Heap[T] {

  def update(address: Address, value: T): Heap[T]

  def free(address: Address): Heap[T]

  def alloc(value: T): (Heap[T], Address)

  def addresses(): Set[Address]

  def lookup(address: Address): Option[T]
}

private case class HeapImpl[T](map: Map[Address, T]) extends Heap[T] {
  override def update(address: Address, value: T): Heap[T] = HeapImpl[T](map.updated(address, value))

  override def free(address: Address): Heap[T] = HeapImpl[T](map.removed(address))

  override def alloc(value: T): (Heap[T], Address) = {
    val nextFreeAddress = addresses().maxOption.getOrElse(0) + 1
    (HeapImpl(map.updated(nextFreeAddress, value)), nextFreeAddress)
  }

  override def addresses(): Set[Address] = map.keys.toSet

  override def lookup(address: Address): Option[T] = map.get(address)
}

object Heap {
  def apply[T](): Heap[T] = HeapImpl(Map())
}

I would like to know if this is proper idiomatic Scala or should I approach it differently.


Get this bounty!!!

#StackBounty: #java #scala #jmeter #jsr223 #scriptengine Scala JSR223 script using JMeter/Java context

Bounty: 50

Scala JSR223 script support since 2.11

e.eval("""s"a is $a, s is $s"""")

I added Scala 2.13 jars and tried to execute script, it can display constants in response

But I can’t add JMeter’s bind variables as log, I tried with:

log.info(a);
$log.info(a);

Or can’t print values to log, tried also

var a:Int =  10
println(a)

JMeter bindings code:

 Bindings bindings = engine.createBindings();
 final Logger logger = LoggerFactory.getLogger(JSR223_INIT_FILE);
 bindings.put("log", logger); // $NON-NLS-1$ (this name is fixed)       
 engine.eval(reader, bindings);

Tried also using bindings but it isn’t in context

bindings.get("log").info("aa");

Exception

ERROR o.a.j.p.j.s.JSR223Sampler: Problem in JSR223 script JSR223 Sampler, message: javax.script.ScriptException: not found: value bindings

How can I submit Scala JSR223 script using JMeter/Java bindings variables?


Get this bounty!!!

#StackBounty: #scala #amazon-web-services #apache-spark #hadoop #amazon-s3 Can't connect from Spark to S3 – AmazonS3Exception Statu…

Bounty: 50

I am trying to connect from Spark (running on my PC) to my S3 bucket:

 val spark = SparkSession
      .builder
      .appName("S3Client")
      .config("spark.master", "local")
      .getOrCreate()

val sc = spark.sparkContext;
    sc.hadoopConfiguration.set("fs.s3a.access.key", ACCESS_KEY)
    sc.hadoopConfiguration.set("fs.s3a.secret.key", SECRET_KEY)
    val txtFile = sc.textFile("s3a://bucket-name/folder/file.txt")
    val contents = txtFile.collect();

But getting the following exception:

Exception in thread “main”
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400,
AWS Service: Amazon S3, AWS Request ID: 07A7BDC9135BCC84, AWS Error
Code: null, AWS Error Message: Bad Request, S3 Extended Request ID:
6ly2vhZ2mAJdQl5UZ/QUdilFFN1hKhRzirw6h441oosGz+PLIvLW2fXsZ9xmd8cuBrNHCdh8UPE=

I have seen this question but it didn’t help me.

Edit:

As Zack suggested, I added:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")

But I still get the same exception.


Get this bounty!!!

#StackBounty: #scala #apache-spark #user-defined-functions How to add null columns to complex array struct in Spark with a udf

Bounty: 50

I am trying to add null columns to embebed array[struct] column, by this way I will able to transform a similar complex column:

  case class Additional(id: String, item_value: String)
  case class Element(income:String,currency:String,additional: Additional)
  case class Additional2(id: String, item_value: String, extra2: String)
  case class Element2(income:String,currency:String,additional: Additional2)

  val  my_uDF = fx.udf((data: Seq[Element]) => {
    data.map(x=>new Element2(x.income,x.currency,new Additional2(x.additional.id,x.additional.item_value,null))).seq
  })
  sparkSession.sqlContext.udf.register("transformElements",my_uDF)
  val result=sparkSession.sqlContext.sql("select transformElements(myElements),line_number,country,idate from entity where line_number='1'")

The goal is add to Element.Additional an extra field called extra2, for this reason I map this field with a UDF but it fails because:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array<struct<income:string,currency:string,additional:struct<id:string,item_value:string>>>) => array<struct<income:string,currency:string,additional:struct<id:string,item_value:string,extra2:string>>>)

If I print schema for ‘Elements’ field shows:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)

And I am trying to convert into this schema:

 |-- myElements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- income: string (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- additional: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- item_value: string (nullable = true)
 |    |    |    |-- extra2: string (nullable = true)


Get this bounty!!!

#StackBounty: #scala #function-composition #finagle Is it possible to have a generic logging filter in finagle that can be "insert…

Bounty: 50

In our code we create many “finagle pipelines” like so:

val f1 = new Filter[A,B,C,D](...)
val f2 = new SimpleFilter[C,D](...)
val f3 = new Filter[C,D,E,F](...)
val s = new Service[E,F](...)

val pipeline: Service[A,B] = f1 andThen f2 andThen f3 andThen s

I would now like the ability to “insert” loggers anywhere in such a chain. The logger would only log the fact that a request came in and a response was received. Something like this:

class LoggerFilter[Req, Resp](customLog: String) extends SimpleFilter[Req, Resp] with LazyLogging{
  override def apply(request: Req, service: Service[Req, Resp]): Future[Resp] = {
    logger.info(s"$customLog => Request: ${request.getClass.getName} -> ${service.toString}")
    service(request).map{resp =>
      logger.info(s"$customLog => Response: ${resp.getClass.getName} -> ${request.getClass.getName}")
      resp
    }
  }
}

With this approach we have to keep declaring multiple loggers so that the types can align correctly and then we insert at the “right location”.

val logger1 = new LoggerFilter[A,B]("A->B Logger")
val logger2 = new LoggerFilter[C,D]("C->D Logger")
val logger3 = new LoggerFilter[E,F]("E->F Logger")

val pipeline = logger1 andThen f1 andThen f2 andThen logger2 andThen f3 andThen logger3 andThen s

Is there a way this can be avoided? Is it possible to just have a single logger that can infer the Req/Resp types automagically and be “insertable anywhere” in the chain?

E.g.:

val logger = getTypeAgnosticLogger // What's the implementation?

val pipeline = logger andThen f1 andThen f2 andThen logger andThen f3 andThen logger andThen s

// Is this possible - params for logger to print?
val pipeline = logger("f1") andThen f1 andThen f2 andThen logger("f3") andThen f3 andThen logger("s") andThen s


Get this bounty!!!

#StackBounty: #reinventing-the-wheel #scala #generics #dsl Typeclass-oriented example project with implicit classes

Bounty: 50

The following code will only be an example of a project that due to (nested) higher kinds heavily relies on typeclasses for its DSL.

When reviewing the code keep in mind that this is an example, so I am not relying on libraries like cats even if they might already provide solutions for what I am doing.

With that aside I have some explicit questions about aspects of my code apart from an overall review:

  • What’s the correct package structuring e.g. /algebra, /syntax, /ops, /dsl, /implicits, … and where in that structure do simple case classes (like M or N) go, where typeclasses (like Invertable), where typeclass-instances (like mnInvertable), where implicit classes (like InvertableOps)?
  • Is my use of an implicit class good here, or is there a better way to define .invert on types like M[N[A]]?
  • Should the implicit parameter in the implicit class be moved to def invert instead?
  • What’s the naming convention for typeclasses (I used ...able), typeclass instances (I used type + typeclass) and implicit classes (I used typeclass + Ops)?
  • Bonus: Is it possible to define Invertable as context bounds instead of an implicit parameter? (I think not, but who knows!)
// src/main/scala/myproject/algebra/M.scala
case class M[A](value: A)

// src/main/scala/myproject/algebra/N.scala
case class N[A](value: A)

// src/main/scala/myproject/syntax/Invertable.scala
trait Invertable[F[_], G[_]] {
  def invert[A](fga: F[G[A]]): G[F[A]]
}

// src/main/scala/myproject/implicits/package.scala
implicit val mnInvertable: Invertable[M, N] = new Invertable[M, N] {
  def invert[A](fga: M[N[A]]): N[M[A]] = N(M(fga.value.value))
}

implicit class InvertableOps[F[_], G[_], A](fga: F[G[A]])(implicit i: Invertable[F, G]) {
  def invert = i.invert(fga)
}

// Somewhere in the project
M(N(1)).invert // It works!


Get this bounty!!!

#StackBounty: #scala #join #activerecord #group-by Write join query with groupby in Scala ActiveRecord

Bounty: 150

I am trying to write a specific query in scala Active record. But it always returns nothing. I have read the wiki on the github page but it does not contain a lot of info on it. The query I am trying to write is

SELECT e.name, e.id, COUNT(pt.pass_id) as pass_count, e.start_date, e.total_passes_to_offer
FROM events e inner join passes p on e.id = p.event_id inner join pass_tickets pt on p.id = pt.pass_id where e.partner_id = 198 group by e.name, e.id

What I have tried is

Event.joins[Pass, PassTicket](
                (event, pass, passTicket) => (event.id === pass.eventId, pass.id === passTicket.passId)
            ).where(
                (event, _, _) => event.partnerId === partnerId
            ).select(
                (event, pass, _) => (event.name, event.id, PassTicket.where(_.passId === pass.id).count, event.startDate, event.totalPassesToOffer)
            ).groupBy( data => data._2)

But first, the return type becomes a map, not a list. And second when executed, it doesnt return anything even though the data exists and the SQL executes fine.


Get this bounty!!!

#StackBounty: #scala #apache-spark #rdd #k-means Spark iterative Kmeans not get expected results?

Bounty: 50

In my homework I am asked to write an naive implementation of Kmeans in Spark:

import breeze.linalg.{ Vector, DenseVector, squaredDistance }
import scala.math 
def parse(line: String): Vector[Double] = {
    DenseVector(line.split(' ').map(_.toDouble))
  }
def closest_assign(p: Vector[Double], centres: Array[Vector[Double]]): Int = {
    var bestIndex = 1
    var closest = Double.PositiveInfinity

    for (i <- 0 until centres.length) {
      val tempDist = squaredDistance(p, centres(i))

      if (tempDist < closest) {
        closest = tempDist
        bestIndex = i
      }
    }

    bestIndex
 }

val fileroot:String="/FileStore/tables/"
val file=sc.textFile(fileroot+"data.txt")
           .map(parse _)
           .cache()
val c1=sc.textFile(fileroot+"c1.txt")
         .map(parse _)
         .collect()

val c2=sc.textFile(fileroot+"c2.txt")
         .map(parse _)
         .collect()
val K=10
val MAX_ITER=20
var kPoints=c2

for(i<-0 until MAX_ITER){
    val closest = file.map(p => (closest_assign(p, kPoints), (p, 1)))

    val pointStats = closest.reduceByKey { case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }

    val newPoints = pointStats.map { pair =>
        (pair._1, pair._2._1 * (1.0 / pair._2._2))
      }.collectAsMap()

     for (newP <- newPoints) {
        kPoints(newP._1) = newP._2
      }

  val tempDist = closest
    .map { x => squaredDistance(x._2._1, newPoints(x._1)) }
    .fold(0) { _ + _ }

     println(i+" time finished iteration (cost = " + tempDist + ")") 
}

In thoory tempDist should become smaller and smaller as the program runs but in reality it goes the other way around. Also I found c1 and c2 changes value as after the for(i<-0 until MAX_ITER) loop. But c1 and c2 should be val values! Is the way I load c1 and c2 wrong? c1 and c2 are two different initial clusters for the data.


Get this bounty!!!