Background:
I have a Tenant ID that I need to pass around in the entire microservice stack. Much like a trace ID in a telemetry/tracing stack.
In the past, I successfully used ThreadLocal
to do that, intercepting the value from an HTTP header in a request, and passing it down as a header in internal API calls, broker messages, akka threads, using it wherever it was needed, e.g. in a DB driver
Problem:
Now, I need to do the same in a reactive WebFlux microservice with Mutiny.
I can intercept the value in a WebFilter and attach it to as a Reactor Context Value, like so:
override fun filter(
serverWebExchange: ServerWebExchange,
webFilterChain: WebFilterChain,
): Mono<Void> {
return webFilterChain.filter(serverWebExchange)
.contextWrite({
val ctx = it.put("tenant", serverWebExchange.request.headers.getOrDefault("tenant", "missing"))
LoggerFactory.getLogger(javaClass).info("adding ctx to $it: $ctx")
ctx
})
}
Which is then propagated downstream and can be used in deferContextual
or transformDeferredContextual
in the stream returned from a Controller
.
Spring does support Mutiny natively, however, the context attached to a Mono<Void>
in a filter is not propagated to the Mutiny stream.
I can add a context to Mutiny before or after subscription, like so:
multi
.withContext { t, ctx ->
ctx.put("z", "3")
t
}
.subscribe()
.with(context) { LoggerFactory.getLogger(javaClass).info("item 1 subscribe: $it") };
However, if the Mutiny stream is transformed into Flux stream, and then the context is attached, it won't work.
IN short, the following works fine and prints all expected values: [ctx1, ctx2]
Multi.createFrom().items("a", "b", "c")
.attachContext()
.invoke { item -> println(item.context().keys()) }
.map { it.get() }
.withContext { t, ctx ->
ctx.put("ctx1", "1")
t
}
.subscribe()
.with(
io.smallrye.mutiny.Context.of(
"ctx2", "2"
)
) { println(it) }
However, if the same Multi is transformed to Flux using MultiReactorConverters
, it does not print context values at all:
MultiReactorConverters.toFlux<String>()
.apply(Multi.createFrom().items("a", "b", "c")
.attachContext()
.invoke { item -> println("multi ctx: ${item.context().keys()}") }
.map { it.get() }
.withContext { t, ctx ->
ctx.put("ctx1", "1")
t
})
.transformDeferredContextual { t, ctx ->
ctx.forEach { k, v -> println("flux ctx: $k => $v") }
t
}
.contextWrite { it.put("ctx3", "3") }
.subscribe { println(it) }
Above will print only, so only the flux context.
flux ctx: ctx3 => 3
multi ctx: []
multi2: []
So my question is:
- Is there a way to propagate context from Flux stream to Mutiny stream?
- If not, can I intercept Mutiny stream, as it's returned from Controller method, before it's transformed to a Mono in the
WebFilter
?
Sidenote, motivation for using Mutiny instead of Reactor:
Besides of IMO better API (e.g. plugs), I want to use Mutiny instead of Reactor, mainly because in the Reactor, context is not propagated in side effects calls.
In Mutiny we have methods like call
or invoke
, that do not change state, but are part of the same chain, which means the context is propagated.
In Reactor on the other hand, we have a fire & forget pattern (also), which won't pass the context, as it's a separate reactive stack.