Generate elements applying a function to two consecutive elements of a Reactor Flux

262 views Asked by At

I have a Flux that will emit some elements, for the sake of simplicity suppose a Flux<String> -> "1", "2", "3", "n"

What I need to do is get two consecutive elements and apply an operation (e.g. flatMap) that will generate some elements from them. Again let's suppose that the function concats the first element with the second and the second with the first one:

f(x,y) -> "xy", "yx"

So the final sequence emitted from the original Flux should be:

"12" - "21" - "23" - "32" - "3n" - "n3"

How could this be done?

1

There are 1 answers

0
codependent On BEST ANSWER

Solved using buffer(2, 1):

fun main() {
    val flux = listOf("1", "2", "3", "4").toFlux()
    flux.buffer(2, 1)
            .flatMap {
                if (it.size == 2) {
                    listOf(it[0] + it[1], it[1] + it[0]).toFlux()
                } else {
                    Flux.empty()
                }
            }.doOnNext { println(it) }
            .subscribe()
}