Reactive pipeline with ComletableFuture, timeout, backpressure

38 views Asked by At

I have an app that publishes messages. Data comes from some source (doesn't matter in this case) and when we publish we get a java CompletableFuture back. I also have special messages - beginning and ending of a batch.

What I want to implement is a pipeline that has several properties:

  • we can record how long it took to get the CompletableFuture to complete
  • we can timeout if the future didn't complete in time
  • in case there is an error on the Future or there is a timeout - report an error
  • have backpressure implemented, don't have too many CompletableFutures in flight
  • record total time it took for the whole batch to complete

I had a different messaging infrastructure before and didn't get a future back, but just could push a message to another service and I have the implementation for that:

I have a sink to which I push a collection of objects to be published:

public void push(Collection<DataEnvelope> envelopes) {
    sink.emitNext(envelopes, this::onEmitFailure);
}

then I get the flux from the sink:

mainFlux = sink.asFlux()
            .filter(this::checkForEmptyDataEnvelopeCollection)
            .flatMap(Flux::fromIterable);

            .filter(this::checkForNullDataEnvelope)
            .filter(this::checkForBeginningOfStreamEnvelope)
            .filter(this::checkForEndOfStreamEnvelope);

            .map(mainPublisher::publish) // this is where I can get CompletableFuture, I can store it in the Envelope object I publish
            .retryWhen(Retry.backoff(pipelineConfig.getErrorRetryTimes(),
Duration.of(pipelineConfig.getErrorRetrySeconds(), ChronoUnit.SECONDS)))
// I want to retry if timeout to complete CompletableFuture expired
            .onErrorContinue(errorPipeline::push) 
            .map(envelope -> {
                this.timedLogger.incrementLogItemCount(envelope.getLoggerKey());
                return envelope;
            });

           mainFlux.subscribe(pipelineStatusReporter::completed);

I wonder how do I convert this to take CompletableFuture into account and how to do backpressure. Say, only have N messages in flight and also if timeout on the future completion expires push the message to error pipeline.

0

There are 0 answers