Given the rsocket client retrieving data from server using stream semantic, I would like to utilize Flux.retryWhen(retryBackoffSpec)
mechanizm to reconnect in case connection is lost. Streaming endpoint on server requires valid JWT tokens to be provided.
Initial connection works fine, however I'm having problems during reconnection:
- if the reconnection happens while the JWT token is still valid the process is successful,
- if the reconnection happens after JWT expired, it's failing with:
last failure={RejectedSetupException (0x3): Jwt expired at 2022-12-13T10:26:39Z}
error
I was not able to find a way to replace a JWT token send as metadata to streaming endpoint before retry is performed. In my attempt, original JWT token (expired one) sent during initial request is used, which of course causes the error:
this.rSocketRequester
.route("stream")
.metadata(getAccessToken(), AUTHENTICATION)
.data(streamRequest)
.retrieveFlux(GenericRecord.class)
.retryWhen(retryBackoffSpec); // ?? how to replace the JWT token ??
I'm looking for a way to replace the metadata send during the retry with newly obtained JWT token (via getAccessToken()
method).
Client
Connection initialization:
private static final MimeType AUTHENTICATION = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());
public void connect() {
final SocketAcceptor responder = RSocketMessageHandler.responder(rSocketStrategies, new ClientHandler());
//@formatter:off
retryBackoffSpec = Retry
.fixedDelay(10, Duration.ofSeconds(10))
.doBeforeRetry(signal -> log.warn("Connecting retry: {}", signal));
final RSocketRequester.Builder rSocketRequesterTpl = rSocketRequesterBuilder
.setupRoute("subscriber")
.setupData(CLIENT_ID)
.rsocketStrategies(builder -> {
builder.encoders(encoders -> {
encoders.add(new BearerTokenAuthenticationEncoder());
});
})
.rsocketConnector(connector -> {
connector.acceptor(responder);
connector.reconnect(retryBackoffSpec);
});
//@formatter:on
this.rSocketRequester = rSocketRequesterTpl.tcp("localhost", 7000);
}
Stream subscription:
public void stream(final MessageCallback callback, final StreamRequest streamRequest) {
final Flux<GenericRecord> streamFlux = this.rSocketRequester
.route("stream")
.metadata(getAccessToken(), AUTHENTICATION)
.data(streamRequest)
.retrieveFlux(GenericRecord.class)
.retryWhen(retryBackoffSpec); // ?? how to replace the JWT token ??
streamFlux.publishOn(scheduler).subscribe(m -> callback.doOnMessage(m), t -> callback.doOnError(t));
}
Server controller
@ConnectMapping("subscriber")
void connectSubscriber(final RSocketRequester requester, @Payload final String client) {
// removed for clarity
}
@PreAuthorize("isAuthenticated()")
@MessageMapping({ "stream" })
Flux<StreamMessage > stream(@AuthenticationPrincipal(expression = "@authenticationConverter.convert(#this)") final JwtAuthenticationToken user,
@Payload(required = false) StreamRequest streamRequest) {
return Flux.interval(Duration.ofMillis(100)).map(i -> {
final StreamMessage message = new StreamMessage();
message.setIndex(i);
return message;
});
}
Relevant dependencies:
spring-boot-starter-parent 2.7.6
rsocket-core 1.1.3
From the documentation of
metadata()
method:So, you can provide updated JWT token like this: