Why RSocket connection retry is using multiple (different) threads every time

667 views Asked by At

I have the below program that connects to a Spring boot rsocket server running on localhost:7999. I have configured the connector Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5)) As you can see the the RSocketRequester is Mono so it should hold a single connection. When the connection fails and the Retry begins, I see that every retry is made from a different thread i.e. as below parallel-1---parallel-8. May I know the reason behind this ?

12:08:24.463550|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:30.470593|parallel-2|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:36.475666|parallel-3|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:42.494801|parallel-4|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #4 (4 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:48.499084|parallel-5|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #5 (5 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:08:54.503385|parallel-6|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #6 (6 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:00.509830|parallel-7|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #7 (7 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:06.545815|parallel-8|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #8 (8 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}
12:09:12.553582|parallel-1|WARN |RSocketRefDataReceiver          |doAfterRetry===>attempt #9 (9 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999}

My Program is as below:

RSocketStrategies strategies = RSocketStrategies.builder()
    .encoders(e -> e.add(new Jackson2CborEncoder()))
    .decoders(e -> e.add(new Jackson2CborDecoder()))
    .build();

Mono<RSocketRequester> requester = Mono.just(RSocketRequester.builder()
     .rsocketConnector(connector -> {
           connector.reconnect(
                     Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5))
                     .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}", e)))
             .acceptor(RSocketMessageHandler.responder(strategies,this))
             .payloadDecoder(PayloadDecoder.ZERO_COPY);
            })
      .dataMimeType(MediaType.APPLICATION_CBOR)
      .setupRoute("test")
      .setupData("test-123")
      .rsocketStrategies(strategies)
      .tcp("localhost",7999));

2

There are 2 answers

0
Saji On BEST ANSWER

Thanks @Yuri @bruto @OlegDokuka and for your suggestions and answers. I have changed my program as below to enforce retry to run on single thread.

connector.reconnect(
        Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(5))
        .scheduler(Schedulers.single()) // <---- This enforces retry to run on a single thread
        .doAfterRetry(e -> LOG.warn("doAfterRetry===>{}", e)))
        .acceptor(RSocketMessageHandler.responder(strategies,this))
        .payloadDecoder(PayloadDecoder.ZERO_COPY);
      })
7
Yuri Schimke On

This article (Flight of the Flux 3) is a good introduction to Spring Reactor threading model. Reactor is the base library providing an implementation of Rx functionality in rsocket-java.

The key sentence is

Schedulers.parallel() is good for CPU-intensive but short-lived tasks. It can execute N such tasks in parallel (by default N == number of CPUs)

Also read up on https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html

If all operations were guaranteed to be on a single thread, then it's likely it would cause noisy latency, as two different clients who happened to get the same thread initially would compete for the thread throughout the lifetime of your program. So it's better than general work get's spread evenly between a limited pool of threads.