I'm using akka with a microservice framework, so I've got a lot of completionStage requests. I want to get a list of elements from one microservice and zip them together with a single element from another such that I end up with a Source of Pair<list item, single element>.
I can't do this with a normal zip because Source.zip completes as soon as one of the two sources completes so I only end up with one element propagated.
I can't use Source.zipAll because that requires me to define the default element ahead of time.
If I already had the single element ahead of time I could use Source.repeat to make it repeatedly propagate that one element, meaning that the Source.zip would instead complete when the list of elements completed, but Source.repeat can't take a completion stage or a Source.completionStage.
My current tactic is to zip the things together before I mapConcat the list elements.
Source<singleElement> singleElement = Source.completionStage(oneService.getSingleElement().invoke());
return Source.completionStage(anotherService.getListOfElements().invoke)
.zip(singleElement)
.flatMapConcat(pair -> Source.fromIterator(() -> pair.first().stream().map(listElement -> Pair.create(listElement, pair.second())));
This eventually gets to what I want, but I feel like there's a lot of unnecessary duplication and moving around of data synchronously. Is there a better way to solve this problem that I'm missing?
The
flatMapConcatoperator should allow you to construct aSource.repeatwhich repeats the single element once it's known. In Scala (Source.futurebeing the Scala equivalent ofSource.completionStage: I'm not familiar enough with Java lambda syntax to answer in Java):