I have an app that publishes messages. Data comes from some source (doesn't matter in this case) and when we publish we get a java CompletableFuture back. I also have special messages - beginning and ending of a batch.
What I want to implement is a pipeline that has several properties:
- we can record how long it took to get the CompletableFuture to complete
- we can timeout if the future didn't complete in time
- in case there is an error on the Future or there is a timeout - report an error
- have backpressure implemented, don't have too many CompletableFutures in flight
- record total time it took for the whole batch to complete
I had a different messaging infrastructure before and didn't get a future back, but just could push a message to another service and I have the implementation for that:
I have a sink to which I push a collection of objects to be published:
public void push(Collection<DataEnvelope> envelopes) {
sink.emitNext(envelopes, this::onEmitFailure);
}
then I get the flux from the sink:
mainFlux = sink.asFlux()
.filter(this::checkForEmptyDataEnvelopeCollection)
.flatMap(Flux::fromIterable);
.filter(this::checkForNullDataEnvelope)
.filter(this::checkForBeginningOfStreamEnvelope)
.filter(this::checkForEndOfStreamEnvelope);
.map(mainPublisher::publish) // this is where I can get CompletableFuture, I can store it in the Envelope object I publish
.retryWhen(Retry.backoff(pipelineConfig.getErrorRetryTimes(),
Duration.of(pipelineConfig.getErrorRetrySeconds(), ChronoUnit.SECONDS)))
// I want to retry if timeout to complete CompletableFuture expired
.onErrorContinue(errorPipeline::push)
.map(envelope -> {
this.timedLogger.incrementLogItemCount(envelope.getLoggerKey());
return envelope;
});
mainFlux.subscribe(pipelineStatusReporter::completed);
I wonder how do I convert this to take CompletableFuture into account and how to do backpressure. Say, only have N messages in flight and also if timeout on the future completion expires push the message to error pipeline.