I tried multiple approaches but it just not work. this is my code:
package assign
import java.nio.file.{Path, Paths}
import java.nio.file.StandardOpenOption.*
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{FlowShape, Graph, IOResult, OverflowStrategy}
import akka.stream.scaladsl.{Balance, Broadcast, FileIO, Flow, GraphDSL, Merge, RunnableGraph, Source}
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
import akka.util.ByteString
import scala.concurrent.{ExecutionContextExecutor, Future}
import concurrent.duration.DurationInt
import scala.util.{Failure, Success}
import akka.Done
import akka.stream.scaladsl.Sink
case class DependenciesInfo(groupId: String, artifactId: String, version: String)
case class Record(library: DependenciesInfo, dependency: DependenciesInfo, dependencyType: String)
case class DependencyCount(library: DependenciesInfo, compileCount: Int, runtimeCount: Int)
object assignment1 extends App:
implicit val actorSystem: ActorSystem = ActorSystem("assignment1")
implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
// Specify Path to CSV File -> Pass as Source -> Source
val resourcesFolder: String = "src/main/resources"
val pathCSVFile: Path = Paths.get(s"$resourcesFolder/maven_dependencies.csv")
val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(pathCSVFile)
// CSV Parsing -> Parsed CSV
val csvParsing: Flow[ByteString, List[ByteString], NotUsed] = CsvParsing.lineScanner()
// CSV Mapping -> Mapped Information
val mappingHeader: Flow[List[ByteString], Map[String, ByteString], NotUsed] = CsvToMap.toMap()
// Function to convert a String into DependenciesInfo
def createDependenciesInfo(str: String): DependenciesInfo = {
val Array(groupId, artifactId, version) = str.split(":")
DependenciesInfo(groupId, artifactId, version)
}
// Flow to replace string values of keys "library" and "dependency" with DependenciesInfo objects
val replaceWithDependenciesInfo: Flow[Map[String, ByteString], Map[String, Any], NotUsed] =
Flow[Map[String, ByteString]].map { inputMap =>
val updatedMap = inputMap.map { case (key, value) =>
key -> (key match {
case "library" => createDependenciesInfo(value.utf8String)
case "dependency" => createDependenciesInfo(value.utf8String)
case _ => value.utf8String
})
}
updatedMap
}
val groupedByLibrary: Flow[Map[String, Any], Record, NotUsed] =
Flow[Map[String, Any]]
.groupBy(185, record => record("library")) // 185 substreams for unique libraries
.throttle(10, per = 1.second) // Throttle to 10 groups per second
.buffer(5, OverflowStrategy.backpressure) // Buffer with capacity 5 and backpressure
.map { record =>
val library = record("library").asInstanceOf[DependenciesInfo]
val dependency = record("dependency").asInstanceOf[DependenciesInfo]
val dependencyType = record("type").toString // Assuming type is a String
Record(library, dependency, dependencyType)
}
.mergeSubstreams // Merge the substreams back into a single stream
///////////////////////////////////////////////////////////
// //
// ONE TRY //
///////////////////////////////////////////////////////////
val countCompile: Flow[Record, Int, NotUsed] =
Flow[Record].filter(_.dependencyType == "Compile").fold(0)((acc, _) => acc + 1)
val countRuntime: Flow[Record, Int, NotUsed] =
Flow[Record].filter(_.dependencyType == "Runtime").fold(0)((acc, _) => acc + 1)
val instantiateDependencyCount: Flow[Int, DependencyCount, NotUsed] =
Flow[Int].fold(DependencyCount(DependenciesInfo("", "", ""), 0, 0)) { (acc, count) =>
DependencyCount(acc.library, count, count)
}
val compileCounter: Flow[Record, DependencyCount, NotUsed] =
Flow[Record].via(countCompile).via(instantiateDependencyCount)
val runtimeCounter: Flow[Record, DependencyCount, NotUsed] =
Flow[Record].via(countRuntime).via(instantiateDependencyCount)
val countDependencyTypes: Flow[Record, DependencyCount, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val balance = builder.add(Balance[Record](2))
val merge = builder.add(Merge[DependencyCount](2))
balance.out(0) ~> compileCounter.async ~> merge.in(0)
balance.out(1) ~> runtimeCounter.async ~> merge.in(1)
FlowShape(balance.in, merge.out)
})
source
.via(csvParsing)
.via(mappingHeader)
.via(replaceWithDependenciesInfo)
.via(groupedByLibrary)
.via(countDependencyTypes)
.to(Sink.foreach[DependencyCount](println)).run()
I am trying to implement this architecture:

It is a Pipe and Filter architecture for a system that processes library dependencies of the Maven repository, i.e., given a dataset of Maven software library dependencies, we want to know how many dependencies each library has.
the end goal is to print an output like this (now I just want the output without formatting):
Name: activemq:activemq-core:3.2 --> Compile: 55 Runtime: 96
and so on ...
but now it runs and finish without printing anything. But when I just run it with:
source
.via(csvParsing)
.via(mappingHeader)
.via(replaceWithDependenciesInfo)
.via(groupedByLibrary)
//.via(countDependencyTypes)
.to(Sink.foreach[Record](println)).run()
it print the output of the groupedByLibrary without problem. The output is like this:
Record(DependenciesInfo(activemq,activemq-core,3.2),DependenciesInfo(cglib,cglib-full,2.0),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2),DependenciesInfo(xmlbeans,xmlpublic,2.0.0-beta1),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2.3),DependenciesInfo(stax,stax-api,1.0),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2.3),DependenciesInfo(xmlbeans,xbean_xpath,2.0.0-beta1),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2.3),DependenciesInfo(xmlbeans,xmlpublic,2.0.0-beta1),Compile)
... and so on.