fs2: How to do something once the stream is started ("doOnSubscribe")?

94 views Asked by At

I am trying to use an impure ("java") API in the context of a cats-effect IO-App. The impure API looks somewhat like this:

import  io.reactivex.Flowable
import java.util.concurrent.CompletableFuture

trait ImpureProducer[A] {
  /** the produced output - must be subscribed to before calling startProducing() */
  def output: Flowable[A]

  /** Makes the producer start publishing its output to any subscribers of `output`. */
  def startProducing(): CompletableFuture[Unit]
}

(Of course, there are more methods including stopProducing(), but those are not relevant for my question.)

My (maybe naive) approach to adapt that API looks as follows (leveraging the fact that Flowable is a org.reactivestreams.Publisher):

import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*

class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
  def output: Stream[IO, A] =
    underlying.output.toStreamBuffered(1)
  def startProducing: IO[Unit] = 
    IO.fromCompletableFuture(IO(underlying.startProducing()))
}

My question is: how can I ensure that the output-stream is subscribed to before I evaluate startProducing?

For example, how could I fix the following attempt to obtain an IO of the very first item that is produced:

import cats.Parallel
import cats.effect.IO

def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
  val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
  // this introduces a race condition: it is not ensured that the output-stream
  // will already be subscribed to when startProducing is evaluated.
  Parallel[IO].parProductL(firstOut)(producer.startProducing)
}
1

There are 1 answers

0
Luis Miguel Mejía Suárez On BEST ANSWER

This can be done using the new Java flow interop.
Note that the reactive-streams inteop is deprecated since the addition of the flow one. The API was redesigned to handle more cases (like this one) and the implementation was re-done from scratch to be more efficient.
If the API you are using doesn't have a flow based version, you can use the FlowAdapters to wrap it.

The code would look like this:

import org.reactivestreams.FlowAdapters

final class AdaptedProducer[A](underlying: ImpureProducer[A], chunkSize: Int) {
  val run: Stream[IO, A] =
    fs2.interop.flow.fromPublisher(chunkSize) { subscriber =>
       IO(
         underlying.output.subscribe(
            FlowAdapters.toFlowSubscriber(
             subscriber
            )
         )
       ) >>
       IO.fromCompletableFuture(IO(underlying.startProducing()))
    }    
}

This ensures that startProducing is called only when you actually start consuming the Stream and after have called subscribe.

Hope this helps :D