I've benchmarked the mapping of a List[ClassA] to List[ClassB] with monix and akka-streams but I don't understand why it is so slow.
I've tried different way to map and here is the result with JMH:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 742,626 â–’ 4,853 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 480,460 â–’ 8,493 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 331,398 â–’ 10,490 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 713,500 â–’ 7,394 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 313,275 â–’ 8,716 ms/op
[info] MappingBenchmark.map ss 20 0,567 â–’ 0,175 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 259,736 â–’ 5,939 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 456,310 â–’ 5,225 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 795,345 â–’ 5,443 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 247,172 â–’ 5,342 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 478,840 â–’ 25,249 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 6,707 â–’ 2,176 ms/op
[info] MappingBenchmark.parMap ss 20 1,257 â–’ 0,831 ms/op
Here is the code:
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: List[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
@Benchmark
def map: List[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: List[ClassB] = {
val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: List[ClassB] = {
val task: Task[List[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = {
val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
The bench result is getting even worse when the initial collection is bigger.
I would like to know what my mistake is.
Thanks for sharing your knowledge!
Best regards
I've updated the code and the bench is really better than before. The difference is related to the List operator. In fact, the first version was using append instead of preprend. Since List is a linked list, it had to iterate over the elements in order to add a new one. By being lazy, I wanted to use _ operator but I should have not.
The result with this updated class is :
it seems that it is still faster to map with scala if we can before running a stream.