I'm trying to understand the correct configuration and usage pattern of LoadbalanceRSocketClient
in a context of SpringBoot application (RSocketRequester
).
I have two RSocket server backends (SpringBoot, RSocket messaging) running and configuring the RSocketRequester
on a client side like this:
List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
HttpClient httpClient = HttpClient.create()
.baseUrl(url)
.secure(ssl ->
ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}
// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
.setupRoute("/connect")
.setupData("test")
//.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
.transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());
Once configured, the requester is being used repeatedly form the timer loop, as following:
@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
requester.route("/foo").data(Data).send().block();
}
It works - client starts, connects to one of the servers and pushes messages to it. If I kill the server that clients connected to, client reconnects to another server on the next timer event. If I start first server again and kill a second one though, client doesn't connect anymore and the following exeption is observed on a client side:
java.util.concurrent.CancellationException: Pool is exhausted
at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) ~[rsocket-core-1.1.0.jar:na]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
at reactor.core.publisher.Mono.block(Mono.java:1678) ~[reactor-core-3.4.0.jar:3.4.0]
I suspect that I'm either not configuring the requester correctly or not using it properly. Would appreciate any hints as documentation and tests are seems to be pretty thin in this area.
Ideally I would want a client to transparently switch to any next available server upon server/connectivity failure. Right now re-connection attempt seems to be happening only on the next call to timer()
method, which is not ideal as client needs to handle incoming messages from the server. Another thing I observed is that even so "/foo"
is a FnF route, unless I do block()
after a send()
server never receives the call.
Update Endpoints List Continuously
LoadbalanceClient
is designed to be integrated with the Discovery service which is responsible for keeping aList
of aliveInstance
s. That said if one of the services disappears from the cluster, the Discovery service updates itsList
of availableInstance
s.On the other hand, to implement client-side loadblancing, we have to know the list of available services in the cluster. It is obvious, that to setup loadbalancing, we can retrieve the list of services and supply it to the Loadbalancer API.
However, imagine that we are in a fully distributed environment, and now every service that disappears and appears again - runs on the absolutely new host and port (e.g. kubernates cluster which does not stick to a particular IP address). That said, Loadbalancing has to consider such a scenario and to avoid dead nodes in the pool, it removes unhealthy nodes from the pool completely.
Now, if all the nodes disappeared and appeared after some time, they are not included in the pool anymore (and if the
Flux
, which provides updates is completed, effectively, the pool is exhausted because no new update will come in from theFlux<List<LodbalanceTarget>>
).However, the nodes register themselves into the Discovery service and become available for observation. All that said we have to periodically pull info from the Discovery service to be up to date and update pool state continuously
With such a setup, the
RSocketPool
will not be exhausted if all the nodes disappear from the cluster, because theFlux<List<LoadbalanceTraget>>
has not completed yet and may provide new updates eventually.Side note on reconnect feature
You may notice, that
RSocketConnector
provides such a great feature called.reconnect
. At first glance, it may seem that the usage ofreconnect
will keep your connection up and running infinitely. Unfortunately, that is not true. The.reconnect
feature is designed to keep yourMono<RSocket>
reusable with cache semantic, which means that you may create a@Bean Mono<RSocket> ...
and autowire it in a various place andsubscribe
multiple times without worrying that the resultRSocket instance
will be different on everyMono<RSocket>.subscribe
. On the other hand,.reconnect
, if givenRSocket
becomes disconnected (e.g. lost connection case) the next subscription to such aMono<RSocket>
will resistible a newRSocket
only once for all concurrent.subscribe
calls.Though it sounds useful feature, in
RSocketPool
we do not rely on it much and useMono<RSocket>
only once to resolve and cache an instance of RSocket inside RSocketPool. That said if such RSocket will be disconnected, we will not be trying to subscribe to the givenMono<RSocket>
again (we assume, that set up host and port will be changed)