Why mapping a class A to class B with monix or akka-streams is so slow?

536 views Asked by At

I've benchmarked the mapping of a List[ClassA] to List[ClassB] with monix and akka-streams but I don't understand why it is so slow.

I've tried different way to map and here is the result with JMH:

[info] Benchmark                                    Mode  Cnt    Score    Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  742,626 â–’  4,853  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  480,460 â–’  8,493  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  331,398 â–’ 10,490  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  713,500 â–’  7,394  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  313,275 â–’  8,716  ms/op
[info] MappingBenchmark.map                           ss   20    0,567 â–’  0,175  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20  259,736 â–’  5,939  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  456,310 â–’  5,225  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  795,345 â–’  5,443  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20  247,172 â–’  5,342  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20  478,840 â–’ 25,249  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20    6,707 â–’  2,176  ms/op
[info] MappingBenchmark.parMap                        ss   20    1,257 â–’  0,831  ms/op

Here is the code:

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._
  import monix.execution.Scheduler.Implicits.global

  def list: List[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  @Benchmark
  def map: List[ClassB] = list.map(o => ClassB(o, o))

  @Benchmark
  def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList

  // ##### MONIX ##### //

  @Benchmark
  def monixTaskGather: List[ClassB] = {
    val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixBatchedObservables: List[ClassB] = {
    val task: Task[List[ClassB]] =
      Observable.fromIterable(list)
        .bufferIntrospective(256)
        .flatMap{items =>
          val tasks = items.map(o => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  // ##### AKKA-STREAM ##### //

  @Benchmark
  def akkaMapFold: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapFoldAsync: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFold: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFoldAsync: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMap: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = {
      val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
      RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs = g.run()
    val res = Await.result(eventualBs, Duration.Inf)
    actorSystem.terminate()
    res
  }
}

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)

The bench result is getting even worse when the initial collection is bigger.

I would like to know what my mistake is.

Thanks for sharing your knowledge!

Best regards

2

There are 2 answers

0
Bounkong KHAMPHOUSONE On BEST ANSWER

I've updated the code and the bench is really better than before. The difference is related to the List operator. In fact, the first version was using append instead of preprend. Since List is a linked list, it had to iterate over the elements in order to add a new one. By being lazy, I wanted to use _ operator but I should have not.

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.collection.immutable.Seq

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._
  import monix.execution.Scheduler.Implicits.global

  def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  def foldClassB = (l:List[ClassB], o:ClassB) => o +: l

  @Benchmark
  def map: Seq[ClassB] = list.map(o => ClassB(o, o))

  @Benchmark
  def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList

  // ##### MONIX ##### //

  @Benchmark
  def monixTaskGather: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixBatchedObservables: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] =
      Observable.fromIterable(list)
        .bufferIntrospective(256)
        .flatMap{items =>
          val tasks = items.map(o => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeft: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeftAsync: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeft: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
    val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
    Await.result(task.runAsync, Duration.Inf)
  }

  // ##### AKKA-STREAM ##### //

  @Benchmark
  def akkaMapFold: Seq[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapFoldAsync: Seq[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapSeq: Seq[ClassB] = {
    val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFold: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFoldAsync: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncSeq: Seq[ClassB] = {
    val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMap: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = {
      val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
      RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMapSeq: Seq[ClassB] = {
    def graph: RunnableGraph[Future[Seq[ClassB]]] = {
      val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
      RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs = g.run()
    val res = Await.result(eventualBs, Duration.Inf)
    actorSystem.terminate()
    res
  }
}

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)

The result with this updated class is :

[info] Benchmark                                    Mode  Cnt   Score   Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  19,052 â–’ 3,779  ms/op
[info] MappingBenchmark.akkaLoadBalanceMapSeq         ss   20  16,115 â–’ 3,232  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  20,862 â–’ 3,127  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  26,994 â–’ 4,010  ms/op
[info] MappingBenchmark.akkaMapAsyncSeq               ss   20  19,399 â–’ 7,089  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  12,132 â–’ 4,111  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  22,652 â–’ 3,802  ms/op
[info] MappingBenchmark.akkaMapSeq                    ss   20  10,894 â–’ 3,114  ms/op
[info] MappingBenchmark.map                           ss   20   0,625 â–’ 0,193  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20   9,175 â–’ 4,080  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  11,724 â–’ 4,458  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  14,174 â–’ 6,962  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20   1,057 â–’ 0,960  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20   9,638 â–’ 4,910  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20   7,065 â–’ 2,428  ms/op
[info] MappingBenchmark.parMap                        ss   20   1,392 â–’ 0,923  ms/op

it seems that it is still faster to map with scala if we can before running a stream.

1
Alexandru Nedelcu On

Just a note on asynchronous processing / parallelism ... in general when processing stuff in parallel you end up with quite a lot of CPU-bound overhead for synchronizing the results.

The overhead can in fact be so significant that it can nullify the time gains that you get from multiple CPU cores working in parallel.

You should also get familiar with Amdahl's Law. Take a look at those numbers: with a parallel portion of 75% you reach the maximum speedup possible with only 4 processors. And with a parallel portion of 50%, you reach the maximum speedup with only 2 processors.

And this is only the theoretical limit, because you also have the shared-memory synchronization between processors which can get really messy; basically processors are optimized for sequential execution. Introduce concurrency concerns and you need to force ordering with memory barriers, which nullify many CPU optimizations. And thus you can reach a negative speedup, as seen in your tests actually.

So you're testing asynchronous / parallel mapping, but the test is basically doing nothing at all, might as well test with the identity function and it would be almost the same thing. In other words the test that you're doing and its results are pretty much useless in practice.

And as a side-note, this is also why I never liked the idea of "parallel collections". The concept is flawed, because you can only use parallel collections for purely CPU-bound stuff (i.e. no I/O, no actual async stuff), which lets say that it is fine for doing some calculations, except that:

  1. for many purposes usage of parallel collections is slower than the normal operators that use a single CPU and
  2. if you actually have CPU-bound work and you need to use your hardware resources to the max, then "parallel collections" in their current incarnation are actually the wrong abstraction, because "hardware" these days includes GPUs

In other words parallel collections are not using hardware resources efficiently, since they totally ignore GPU support and are totally inadequate for mixed CPU - I/O tasks, since they lack asynchrony support.

I feel the need to mention this because too often people think that rubbing some "parallel" pixie dust on their code will make it run faster, but many times it won't.

Parallelism works great when you've got I/O-bound tasks (mixed with CPU-bound tasks of course) and in that case the CPU overhead is much less significant, because processing time is going to be dominated by I/O.

PS: plain mapping over Scala collections should be faster because it is strict and (depending on the collection type) it uses array-backed buffers and thus don't trash CPU caches. Monix's .map has the same overhead as Scala's Iterable.map, or in other words near-zero overhead, but its application is lazy and introduces some boxing overhead, which we can't get rid of because the JVM doesn't specialize generics.

It's damn fast in practice though ;-)