I'm trying to use WebFlux with RSocket, The sample application has server and client applications. both running on WebFlux and RSocket, my rsocket communication type is request-stream. client-server application runs perfectly fine for couple concurrent requests, however when I load test with 1000qps with 8 threads, requests starts hanging. On investigation below sample code passes through load test.
WORKING SAMPLE
RSocketClientConfig.java
public class RSocketClientConfig {
@Bean
RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
RSocketClientProperties clientProp) {
RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
.dataMimeType(new MimeType("application", "x-protobuf"))
.connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();
rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
return rsocketRequester;
}
}
Client.java
@Service
public class PersonRSocketClient {
@Autowired
private RSocketRequester personClient;
public Flux<Person> list() {
return personClient.route("person").retrieveFlux(Person.class);
}
}
NOT WORKING
RSocketClientConfig.java
public class RSocketClientConfig {
@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
RSocketClientProperties clientProp) {
Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
.dataMimeType(new MimeType("application", "x-protobuf"))
.connectTcp(clientProp.getHost(), clientProp.getRsocPort());
return rsocketRequester;
}
}
Client.java
@Service
public class PersonRSocketClient {
@Autowired
private Mono<RSocketRequester> personClient;
public Flux<Person> list() {
return personClient
.flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
}
}
How to map request-stream to flux correctly?