Akka Streaming - Redistribute chunks into max_permissible_chunk_size Scala

61 views Asked by At

My code is uploading binary to s3 using Akka streaming as shown below:

source
              .via(distributeChunks(MAX_BYTES_PER_CHUNK))
              .throttle(maxRequestsPerSecond, 1.second, maximumBurst = 1, ThrottleMode.Shaping)
              .runFoldAsync(1) { (partNumber, bytes) =>
                {...}

I need to write distributeChunks in such a way that it should break every incoming stream in either MAX_BYTES_PER_CHUNK or less than that only in case of last chunk if bytes size is less than MAX_BYTES_PER_CHUNK.

I tried this:

private def distributeChunks(maxChunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
    Flow[ByteString]
      .statefulMapConcat { () =>
        var buffer = ByteString.empty

        { bs: ByteString =>
          buffer ++= bs
          val chunks = new ArrayBuffer[ByteString]

          while (buffer.length >= maxChunkSize) {
            val (chunk, rest) = buffer.splitAt(maxChunkSize)
            chunks += chunk
            buffer = rest
          }

          chunks.toList
        }
      }
      .mapMaterializedValue(_ => NotUsed)

This ensures every chunk size is equal to MAX_BYTES_PER_CHUNK but it misses out the last chunk and I am a bit confused to how to fix that. Can someone help me simulate this better and come up with the right code to have desired results?

Here are two test cases:

FILE SIZE: 10MB, MAX_PERMISSIBLE_CHUNK: 2MB should break into 5 chunks of 2MB.
FILE SIZE: 9MB, MAX_PERMISSIBLE_CHUNK: 2MB should break into 4 chunks of 2MB and 1 chunk of exactly 1MB.
3

There are 3 answers

0
Shivam Sahil On BEST ANSWER

After a lot of debugging I realised that I need to use custom inlet and outlet handler in order to ensure the bytes are correctly accumulated:

def distributeChunks(maxChunkSize: Int): Flow[ByteString, ByteString, NotUsed] =
    Flow.fromGraph(new GraphStage[FlowShape[ByteString, ByteString]] {
      val in: Inlet[ByteString] = Inlet[ByteString]("distributeChunks.in")
      val out: Outlet[ByteString] = Outlet[ByteString]("distributeChunks.out")
      override val shape: FlowShape[ByteString, ByteString] = FlowShape.of(in, out)

      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) {
          var buffer: ByteString = ByteString.empty

          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              buffer ++= grab(in)
              if (buffer.length >= maxChunkSize) {
                val (chunk, rest) = buffer.splitAt(maxChunkSize)
                buffer = rest
                push(out, chunk)
              } else {
                pull(in)
              }
            }

            override def onUpstreamFinish(): Unit = {
              if (buffer.nonEmpty) {
                emit(out, buffer)
              }
              completeStage()
            }
          })

          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
              if (!hasBeenPulled(in)) {
                pull(in)
              }
            }
          })
        }
    })

The problem with while loop is that it doesn't have control to the bytes injected from client side and this can cause issues. It may depend upon the network configuration and many other factors and so I used custom streaming to sort this out.

Also just noticed that Akka is now under BSL which is sad hopefully the old functionalities can be used as is. :)

0
earthling paul On

StreamConverters could also help you with file chunking, sth like:

  val MAX_BYTES_PER_CHUNK = 1024 * 1000 * 2 // 2 MB
  val fileInputStream = new FileInputStream("9MB.pdf")

  StreamConverters
    .fromInputStream(() => fileInputStream, chunkSize = MAX_BYTES_PER_CHUNK)
    .map(each => println(s"Chunk size: ${each.size/(1024 * 1000)} MB"))
    .runWith(Sink.ignore)

This should yield:

Chunk size: 2 MB
Chunk size: 2 MB
Chunk size: 2 MB
Chunk size: 2 MB
Chunk size: 1 MB

See also this answer: https://discuss.lightbend.com/t/stream-byte-array-into-chunks/8817/2

2
Levi Ramsey On

The custom stage approach works, but it should be noted that what the custom stage is adding to statefulMapConcat is detecting when the upstream has completed.

This can be accomplished within the stream DSL if there's an element which doesn't come from the upstream or which would have no actual effect inside the statefulMapConcat by concating a Source.single of that element. In this case, an empty ByteString is such an element (because buffer is always shorter than maxChunkSize, appending an empty ByteString won't make it as long or longer than maxChunkSize, therefore no chunks will be emitted). We can therefore safely filter out empty ByteStrings from upstream and inject an empty ByteString on upstream completion.

Flow[ByteString]
  .filter(_.nonEmpty)
  .concat(Source.single(ByteString.empty))
  .statefulMapConcat { () =>
    var buffer = ByteString.empty

    { bs: ByteString =>
      if (bs.nonEmpty) {
        buffer ++= bs
        val chunks = new ArrayBuffer[ByteString]

        while (buffer.length >= maxChunkSize) {
          val (chunk, rest) = buffer.splitAt(maxChunkSize)
          chunks += chunk
          buffer = rest
        }

        chunks.toList
      } else Some(buffer)  // end-of-stream
    }
  }

In the general case, you can map the upstream element to Some(element) and .concat(Source.single(None)):

Flow[T]
  .map(Some(_))
  .concat(Source.single(None))
  .statefulMapConcat { () =>
    var state = ???

    { in: Option[T] =>
      in match {
        case Some(t) =>
          // incorporate t into state and possibly emit
          ???

        case None =>
          // end-of-stream, emit final elements based on state
          // "A pulse of dying power in a clenching plastic fist" -- Peart (1984)
          ???
      }
    }
  }

This general approach, of course, does an object allocation for every element: if there's a useful T which satisfies our "never emitted by upstream or if it is we can ignore it" requirement, the custom approach is more efficient.