Spring Webflux Context Propagation in a Mutiny Stream

162 views Asked by At

Background:

I have a Tenant ID that I need to pass around in the entire microservice stack. Much like a trace ID in a telemetry/tracing stack.

In the past, I successfully used ThreadLocal to do that, intercepting the value from an HTTP header in a request, and passing it down as a header in internal API calls, broker messages, akka threads, using it wherever it was needed, e.g. in a DB driver

Problem:

Now, I need to do the same in a reactive WebFlux microservice with Mutiny.

I can intercept the value in a WebFilter and attach it to as a Reactor Context Value, like so:

    override fun filter(
        serverWebExchange: ServerWebExchange,
        webFilterChain: WebFilterChain,
    ): Mono<Void> {
        return webFilterChain.filter(serverWebExchange)
            .contextWrite({
                val ctx = it.put("tenant", serverWebExchange.request.headers.getOrDefault("tenant", "missing"))
                LoggerFactory.getLogger(javaClass).info("adding ctx to $it: $ctx")
                ctx
            })
    }

Which is then propagated downstream and can be used in deferContextual or transformDeferredContextual in the stream returned from a Controller.

Spring does support Mutiny natively, however, the context attached to a Mono<Void> in a filter is not propagated to the Mutiny stream.

I can add a context to Mutiny before or after subscription, like so:

multi
            .withContext { t, ctx ->
                ctx.put("z", "3")
                t
            }
            .subscribe()
            .with(context) { LoggerFactory.getLogger(javaClass).info("item 1 subscribe: $it") };

However, if the Mutiny stream is transformed into Flux stream, and then the context is attached, it won't work.

IN short, the following works fine and prints all expected values: [ctx1, ctx2]

    Multi.createFrom().items("a", "b", "c")
        .attachContext()
        .invoke { item -> println(item.context().keys()) }
        .map { it.get() }
        .withContext { t, ctx ->
            ctx.put("ctx1", "1")
            t
        }
        .subscribe()
        .with(
            io.smallrye.mutiny.Context.of(
                "ctx2", "2"
            )
        ) { println(it) }

However, if the same Multi is transformed to Flux using MultiReactorConverters, it does not print context values at all:

MultiReactorConverters.toFlux<String>()
        .apply(Multi.createFrom().items("a", "b", "c")
            .attachContext()
            .invoke { item -> println("multi ctx: ${item.context().keys()}") }
            .map { it.get() }
            .withContext { t, ctx ->
                ctx.put("ctx1", "1")
                t
            })
        .transformDeferredContextual { t, ctx ->
            ctx.forEach { k, v -> println("flux ctx: $k => $v") }
            t
        }
        .contextWrite { it.put("ctx3", "3") }
        .subscribe { println(it) }

Above will print only, so only the flux context.

flux ctx: ctx3 => 3
multi ctx: []
multi2: []

So my question is:

  1. Is there a way to propagate context from Flux stream to Mutiny stream?
  2. If not, can I intercept Mutiny stream, as it's returned from Controller method, before it's transformed to a Mono in the WebFilter ?

Sidenote, motivation for using Mutiny instead of Reactor:

Besides of IMO better API (e.g. plugs), I want to use Mutiny instead of Reactor, mainly because in the Reactor, context is not propagated in side effects calls. In Mutiny we have methods like call or invoke, that do not change state, but are part of the same chain, which means the context is propagated.

In Reactor on the other hand, we have a fire & forget pattern (also), which won't pass the context, as it's a separate reactive stack.

0

There are 0 answers