Correct usage of LoadbalanceRSocketClient with Spring's RSocketRequester

1.7k views Asked by At

I'm trying to understand the correct configuration and usage pattern of LoadbalanceRSocketClient in a context of SpringBoot application (RSocketRequester).

I have two RSocket server backends (SpringBoot, RSocket messaging) running and configuring the RSocketRequester on a client side like this:

List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
  HttpClient httpClient = HttpClient.create()
    .baseUrl(url)
    .secure(ssl -> 
       ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
  servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  //.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
 .transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());   

Once configured, the requester is being used repeatedly form the timer loop, as following:

@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
  requester.route("/foo").data(Data).send().block();
}

It works - client starts, connects to one of the servers and pushes messages to it. If I kill the server that clients connected to, client reconnects to another server on the next timer event. If I start first server again and kill a second one though, client doesn't connect anymore and the following exeption is observed on a client side:

java.util.concurrent.CancellationException: Pool is exhausted
    at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.block(Mono.java:1678) ~[reactor-core-3.4.0.jar:3.4.0]

I suspect that I'm either not configuring the requester correctly or not using it properly. Would appreciate any hints as documentation and tests are seems to be pretty thin in this area.

Ideally I would want a client to transparently switch to any next available server upon server/connectivity failure. Right now re-connection attempt seems to be happening only on the next call to timer() method, which is not ideal as client needs to handle incoming messages from the server. Another thing I observed is that even so "/foo" is a FnF route, unless I do block() after a send() server never receives the call.

3

There are 3 answers

1
Oleh Dokuka On BEST ANSWER

Update Endpoints List Continuously

LoadbalanceClient is designed to be integrated with the Discovery service which is responsible for keeping a List of alive Instances. That said if one of the services disappears from the cluster, the Discovery service updates its List of available Instances.

On the other hand, to implement client-side loadblancing, we have to know the list of available services in the cluster. It is obvious, that to setup loadbalancing, we can retrieve the list of services and supply it to the Loadbalancer API.

ReactiveDiscoveryClient discoveryClient = ...

Mono<List<LoadbalanceTarget>> serversMono = discoveryClient
    .getInstances(serviceGroupName)
    .map(si -> {
        HttpClient httpClient = HttpClient.create()
          .baseUrl(si.getUri())
          .secure(ssl -> ssl.sslContext(
              SslContextBuilder.forClient()
                         .trustManager(InsecureTrustManagerFactory.INSTANCE)
          ));
        return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
    })
    .collectList()

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  .transports(serversMono.flux(), new RoundRobinLoadbalanceStrategy());   

However, imagine that we are in a fully distributed environment, and now every service that disappears and appears again - runs on the absolutely new host and port (e.g. kubernates cluster which does not stick to a particular IP address). That said, Loadbalancing has to consider such a scenario and to avoid dead nodes in the pool, it removes unhealthy nodes from the pool completely.

Now, if all the nodes disappeared and appeared after some time, they are not included in the pool anymore (and if the Flux, which provides updates is completed, effectively, the pool is exhausted because no new update will come in from the Flux<List<LodbalanceTarget>>).

However, the nodes register themselves into the Discovery service and become available for observation. All that said we have to periodically pull info from the Discovery service to be up to date and update pool state continuously

ReactiveDiscoveryClient discoveryClient = ...

Flux<List<LoadbalanceTarget>> serversFlux = discoveryClient
    .getInstances(serviceGroupName)
    .map(si -> {
        HttpClient httpClient = HttpClient.create()
          .baseUrl(si.getUri())
          .secure(ssl -> ssl.sslContext(
              SslContextBuilder.forClient()
                         .trustManager(InsecureTrustManagerFactory.INSTANCE)
          ));
        return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
    })
    .collectList()
    .repeatWhen(f -> f.delayElements(Duration.ofSeconds(1))) // <- continuously retrieve new List of ServiceInstances

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  .transports(servers, new RoundRobinLoadbalanceStrategy());

With such a setup, the RSocketPool will not be exhausted if all the nodes disappear from the cluster, because the Flux<List<LoadbalanceTraget>> has not completed yet and may provide new updates eventually.

Note, the implementation is smart enough to keep active nodes on every update from the discovery service. That said if there is such a service instance in the pool, you will not get 2 connections at the same time.

Side note on reconnect feature

You may notice, that RSocketConnector provides such a great feature called .reconnect. At first glance, it may seem that the usage of reconnect will keep your connection up and running infinitely. Unfortunately, that is not true. The .reconnect feature is designed to keep your Mono<RSocket> reusable with cache semantic, which means that you may create a @Bean Mono<RSocket> ... and autowire it in a various place and subscribe multiple times without worrying that the result RSocket instance will be different on every Mono<RSocket>.subscribe. On the other hand, .reconnect, if given RSocket becomes disconnected (e.g. lost connection case) the next subscription to such a Mono<RSocket> will resistible a new RSocket only once for all concurrent .subscribe calls.

Though it sounds useful feature, in RSocketPool we do not rely on it much and use Mono<RSocket> only once to resolve and cache an instance of RSocket inside RSocketPool. That said if such RSocket will be disconnected, we will not be trying to subscribe to the given Mono<RSocket> again (we assume, that set up host and port will be changed)

0
Yuri Schimke On

For the question around FnF, this is part of the Rx model. Without a subscribe the event doesn't happen. You are free to call an API returning a Mono without side effects before the subscribe, any other behaviour is a bug.

  /**
   * Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
   * multiple subscriptions and performs a request per subscriber.
   */
  Mono<Void> fireAndForget(Mono<Payload> payloadMono);

If you call this method once, and then subscribe 3 times on the result it will execute it 3 times.

1
maximdim On

Oleh, I tried what you suggested and it works to some extent, although I still can't quite get the behavior I need.

What I want to do is:

  • Client connects to a single (random) backend at a time
  • If backend or connectivity to the backend fails, client should try to connect to the next available backend.

I guess I can't use RoundRobinLoadbalanceStrategy as it connects the client to all available backends. Should I use WeightedLoadbalanceStrategy instead? Or should discoveryClient abstraction only return a single server every time - but that no longer would be a 'pool' client, right?

Perhaps I should re-think by approach in general. I have a few dozens of thousands of clients so I want to balance the load on the back end - spread it across multiple instances of the backend, so each client randomly connects to one instance of the backend but is capable of re-connecting to another instance, if instance it conneced to fails. I assume that this is not a good idea to connect all clients to every backend instance at the same time, but maybe I'm wrong?