Rsocket-java stream: How to replace/update the JWT token within Flux.retryWhen(...)?

136 views Asked by At

Given the rsocket client retrieving data from server using stream semantic, I would like to utilize Flux.retryWhen(retryBackoffSpec) mechanizm to reconnect in case connection is lost. Streaming endpoint on server requires valid JWT tokens to be provided.

Initial connection works fine, however I'm having problems during reconnection:

  • if the reconnection happens while the JWT token is still valid the process is successful,
  • if the reconnection happens after JWT expired, it's failing with:

last failure={RejectedSetupException (0x3): Jwt expired at 2022-12-13T10:26:39Z} error

I was not able to find a way to replace a JWT token send as metadata to streaming endpoint before retry is performed. In my attempt, original JWT token (expired one) sent during initial request is used, which of course causes the error:

this.rSocketRequester
        .route("stream")
        .metadata(getAccessToken(), AUTHENTICATION)
        .data(streamRequest)
        .retrieveFlux(GenericRecord.class)
        .retryWhen(retryBackoffSpec); // ?? how to replace the JWT token ??

I'm looking for a way to replace the metadata send during the retry with newly obtained JWT token (via getAccessToken() method).

Client

Connection initialization:

private static final MimeType AUTHENTICATION = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());

public void connect() {
    final SocketAcceptor responder = RSocketMessageHandler.responder(rSocketStrategies, new ClientHandler());

    //@formatter:off
    retryBackoffSpec = Retry
            .fixedDelay(10, Duration.ofSeconds(10))
            .doBeforeRetry(signal -> log.warn("Connecting retry: {}", signal));

    final RSocketRequester.Builder rSocketRequesterTpl = rSocketRequesterBuilder
            .setupRoute("subscriber")
            .setupData(CLIENT_ID)
            .rsocketStrategies(builder -> {
                builder.encoders(encoders -> {
                    encoders.add(new BearerTokenAuthenticationEncoder());
                });
            })
            .rsocketConnector(connector -> {
                connector.acceptor(responder);
                connector.reconnect(retryBackoffSpec);
            });
    //@formatter:on

    this.rSocketRequester = rSocketRequesterTpl.tcp("localhost", 7000);
}

Stream subscription:

public void stream(final MessageCallback callback, final StreamRequest streamRequest) {
    final Flux<GenericRecord> streamFlux = this.rSocketRequester
            .route("stream")
            .metadata(getAccessToken(), AUTHENTICATION)
            .data(streamRequest)
            .retrieveFlux(GenericRecord.class)
            .retryWhen(retryBackoffSpec); // ?? how to replace the JWT token ??

    streamFlux.publishOn(scheduler).subscribe(m -> callback.doOnMessage(m), t -> callback.doOnError(t));
}

Server controller


    @ConnectMapping("subscriber")
    void connectSubscriber(final RSocketRequester requester, @Payload final String client) {
    // removed for clarity
    }


@PreAuthorize("isAuthenticated()")
@MessageMapping({ "stream" })
Flux<StreamMessage > stream(@AuthenticationPrincipal(expression = "@authenticationConverter.convert(#this)") final JwtAuthenticationToken user,
        @Payload(required = false) StreamRequest streamRequest) {
        return Flux.interval(Duration.ofMillis(100)).map(i -> {
            final StreamMessage message = new StreamMessage();
            message.setIndex(i);
            return message;
        });
}

Relevant dependencies:

spring-boot-starter-parent 2.7.6
rsocket-core 1.1.3
1

There are 1 answers

0
Bahaa Zaid On

From the documentation of metadata() method:

The metadata value be a concrete value or any producer of a single value that can be adapted to a Publisher via ReactiveAdapterRegistry.

So, you can provide updated JWT token like this:

this.rSocketRequester
    .route("stream")
    .metadata(Mono.fromSupplier(this::getAccessToken), AUTHENTICATION)
    .data(streamRequest)
    .retrieveFlux(GenericRecord.class)
    .retryWhen(retryBackoffSpec);