Spring Boot RSocket send a message within a Message Mapping

672 views Asked by At

Staring with the tutorial code at benwilcock/spring-rsocket-demo I am trying to write a server that replicates messages to a second server before responding to a client.

To try to debug my issues I am only attempting a trivial ping-pong exchange between servers. Only when the second server responds to the pong message should the first server reply to the client:

@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request) {
    // register a mono that will be completed when replication to another server has happened
    String uuid = UUID.randomUUID().toString();
    Mono<Message> deferred = Mono.create(sink -> replicationNexus.registerRequest(uuid, sink));

    // FIXME attempt to send a nested request-response message that will complete the outer message later
    this.requesterMono.flatMap(requester -> requester.route("pong")
            .data(uuid)
            .retrieveMono(String.class))
            .subscribeOn(Schedulers.elastic())
            .subscribe( uuid2 -> replicationNexus.complete(uuid2, new Message(SERVER, RESPONSE)));

    // return the deferred work that will be completed by the pong response
    return deferred;
}

That logic is trying to use this answer to create a connection to the second server that will reconnect:

    this.requesterMono = builder.rsocketConnector(connector -> connector
            .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", otherPort).cache();

To complete the picture here is the trivial ping-pong logic:

@MessageMapping("pong")
public Mono<String> pong(String m) {
    return Mono.just(m);
}

and here is the logic that holds the state of the outer client response that is completed when the other server responds:

public class ReplicationNexus<T> {
final Map<String, MonoSink<T>> requests = new ConcurrentHashMap<>();

public void registerRequest(String v, MonoSink<T> sink) {
    requests.put(v, sink);
}

public boolean complete(String uuid, T message) {
    Optional<MonoSink<T>> sink = Optional.of(requests.get(uuid));
    if( sink.isPresent() ){
        sink.get().success(message);
    }
    return sink.isPresent();
}
}

Debugging the second server it never runs the pong method. It seems that the first server does not actually send the inner request message.

What is the correct way to run an inner request-response exchange that completes an outer message exchange with automated reconnection logic?

1

There are 1 answers

5
Yuri Schimke On

Not sure if I'm missing some of the complexity of your question, but if the middle server is just activing like a proxy I'd start with the simplest case of chaining through the calls. I feel like I'm missing some nuance of the question, so let's work through that next.

  @MessageMapping("runCommand")
  suspend fun runCommandX(
    request: CommandRequest,
  ): Mono<String> {
    val uuid = UUID.randomUUID().toString()

    return requesterMono
      .flatMap { requester: RSocketRequester ->
        requester.route("pong")
          .data("TEST")
          .retrieveMono(String::class.java)
      }
      .doOnSubscribe {
        // register request with uuid
      }
      .doOnSuccess {
        // register completion
      }
      .doOnError {
        // register failure
      }
  }

Generally if you can avoid calling subscribe yourself in typical spring/reactive/rsocket code. You want the framework to do this for you.