Can't make this Pipe & Filter Scala Code to print the output

104 views Asked by At

I tried multiple approaches but it just not work. this is my code:

package assign

import java.nio.file.{Path, Paths}
import java.nio.file.StandardOpenOption.*
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{FlowShape, Graph, IOResult, OverflowStrategy}
import akka.stream.scaladsl.{Balance, Broadcast, FileIO, Flow, GraphDSL, Merge, RunnableGraph, Source}
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
import akka.util.ByteString


import scala.concurrent.{ExecutionContextExecutor, Future}
import concurrent.duration.DurationInt
import scala.util.{Failure, Success}
import akka.Done
import akka.stream.scaladsl.Sink

case class DependenciesInfo(groupId: String, artifactId: String, version: String)
case class Record(library: DependenciesInfo, dependency: DependenciesInfo, dependencyType: String) 
case class DependencyCount(library: DependenciesInfo, compileCount: Int, runtimeCount: Int)

object assignment1 extends App:

  implicit val actorSystem: ActorSystem = ActorSystem("assignment1")
  implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher

  // Specify Path to CSV File -> Pass as Source -> Source
  val resourcesFolder: String = "src/main/resources"
  val pathCSVFile: Path = Paths.get(s"$resourcesFolder/maven_dependencies.csv")

  val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(pathCSVFile)

  // CSV Parsing -> Parsed CSV
  val csvParsing: Flow[ByteString, List[ByteString], NotUsed] = CsvParsing.lineScanner()
  
  // CSV Mapping -> Mapped Information
  val mappingHeader: Flow[List[ByteString], Map[String, ByteString], NotUsed] = CsvToMap.toMap()

  // Function to convert a String into DependenciesInfo
  def createDependenciesInfo(str: String): DependenciesInfo = {
    val Array(groupId, artifactId, version) = str.split(":")
    DependenciesInfo(groupId, artifactId, version)
  }

  // Flow to replace string values of keys "library" and "dependency" with DependenciesInfo objects
  val replaceWithDependenciesInfo: Flow[Map[String, ByteString], Map[String, Any], NotUsed] =
    Flow[Map[String, ByteString]].map { inputMap =>
      val updatedMap = inputMap.map { case (key, value) =>
        key -> (key match {
        case "library" => createDependenciesInfo(value.utf8String)
        case "dependency" => createDependenciesInfo(value.utf8String)
        case _ => value.utf8String
      })
      }
      updatedMap
    } 

  val groupedByLibrary: Flow[Map[String, Any], Record, NotUsed] =
    Flow[Map[String, Any]]
      .groupBy(185, record => record("library")) // 185 substreams for unique libraries
      .throttle(10, per = 1.second) // Throttle to 10 groups per second
      .buffer(5, OverflowStrategy.backpressure) // Buffer with capacity 5 and backpressure
      .map { record =>
        val library = record("library").asInstanceOf[DependenciesInfo]
        val dependency = record("dependency").asInstanceOf[DependenciesInfo]
        val dependencyType = record("type").toString // Assuming type is a String

        Record(library, dependency, dependencyType)
      }
      .mergeSubstreams // Merge the substreams back into a single stream
  ///////////////////////////////////////////////////////////
  //                                                       //
  //                     ONE TRY                           //
  ///////////////////////////////////////////////////////////
  val countCompile: Flow[Record, Int, NotUsed] =
  Flow[Record].filter(_.dependencyType == "Compile").fold(0)((acc, _) => acc + 1)

  val countRuntime: Flow[Record, Int, NotUsed] =
    Flow[Record].filter(_.dependencyType == "Runtime").fold(0)((acc, _) => acc + 1)

  val instantiateDependencyCount: Flow[Int, DependencyCount, NotUsed] =
    Flow[Int].fold(DependencyCount(DependenciesInfo("", "", ""), 0, 0)) { (acc, count) =>
      DependencyCount(acc.library, count, count)
    }

  val compileCounter: Flow[Record, DependencyCount, NotUsed] =
    Flow[Record].via(countCompile).via(instantiateDependencyCount)

  val runtimeCounter: Flow[Record, DependencyCount, NotUsed] =
    Flow[Record].via(countRuntime).via(instantiateDependencyCount)

  val countDependencyTypes: Flow[Record, DependencyCount, NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val balance = builder.add(Balance[Record](2))
      val merge = builder.add(Merge[DependencyCount](2))

      balance.out(0) ~> compileCounter.async ~> merge.in(0)
      balance.out(1) ~> runtimeCounter.async ~> merge.in(1)
      FlowShape(balance.in, merge.out)
    })

  
  source
    .via(csvParsing)
    .via(mappingHeader)
    .via(replaceWithDependenciesInfo)
    .via(groupedByLibrary)
    .via(countDependencyTypes)
    .to(Sink.foreach[DependencyCount](println)).run()

I am trying to implement this architecture:

Application diagram

It is a Pipe and Filter architecture for a system that processes library dependencies of the Maven repository, i.e., given a dataset of Maven software library dependencies, we want to know how many dependencies each library has.

the end goal is to print an output like this (now I just want the output without formatting):

Name: activemq:activemq-core:3.2 --> Compile: 55 Runtime: 96
 and so on ...

but now it runs and finish without printing anything. But when I just run it with:

source
    .via(csvParsing)
    .via(mappingHeader)
    .via(replaceWithDependenciesInfo)
    .via(groupedByLibrary)
    //.via(countDependencyTypes)
    .to(Sink.foreach[Record](println)).run()

it print the output of the groupedByLibrary without problem. The output is like this:

Record(DependenciesInfo(activemq,activemq-core,3.2),DependenciesInfo(cglib,cglib-full,2.0),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2),DependenciesInfo(xmlbeans,xmlpublic,2.0.0-beta1),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2.3),DependenciesInfo(stax,stax-api,1.0),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2.3),DependenciesInfo(xmlbeans,xbean_xpath,2.0.0-beta1),Compile)
Record(DependenciesInfo(activemq,activemq-core,3.2.3),DependenciesInfo(xmlbeans,xmlpublic,2.0.0-beta1),Compile)
 ... and so on.
0

There are 0 answers