WebFlux+RSocket, How to pass flux from RSocket to WebFlux

656 views Asked by At

I'm trying to use WebFlux with RSocket, The sample application has server and client applications. both running on WebFlux and RSocket, my rsocket communication type is request-stream. client-server application runs perfectly fine for couple concurrent requests, however when I load test with 1000qps with 8 threads, requests starts hanging. On investigation below sample code passes through load test.


WORKING SAMPLE

RSocketClientConfig.java

public class RSocketClientConfig {

    @Bean
    RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
            RSocketClientProperties clientProp) {

        RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();

        rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
                .doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
        return rsocketRequester;
    }


}

Client.java

@Service
public class PersonRSocketClient {

    @Autowired
    private RSocketRequester personClient;

    public Flux<Person> list() {
        return personClient.route("person").retrieveFlux(Person.class);
    }

}

NOT WORKING

RSocketClientConfig.java

public class RSocketClientConfig {

    @Bean
    Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
        RSocketClientProperties clientProp) {

        
        Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort());

        return rsocketRequester;
    }
}

Client.java

@Service
public class PersonRSocketClient {

    @Autowired
    private Mono<RSocketRequester> personClient;

    public Flux<Person> list() {
        return personClient
                .flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
    }

}

How to map request-stream to flux correctly?

0

There are 0 answers