I am new to reactive programming and webflux and I´m evaluating Migration from Spring Data Elasticsearch Application on Servlet Stack with WebMVC to Reactive Stack with Spring Webflux.
I´ve developed two identical simple spring boot applications that can perform CRUD Operations with Spring Data Elasticsearch Repositories.
Test is User indexing/saving documents to Elasticsearch. Load test is then 500 concurrent users, ramp-up time 20s, 20 iterations each user (total 10000 docs)
I was expecting Webflux on Netty outperform MVC on Tomcat, especially with more concurrent users. But results are the other way around. Response times of Netty almost double as high than tomcat and I needed to increase maxConnection queue in reactive client, because I was getting ReadTimeoutExceptions. So what am I doing wrong?
Questions:
- Shouldn´t Webflux on Netty be able to handle more concurrent users better?
- Why are response times so high? ...and throughput lower on netty?
- Do I need to configure the reactive client differently to get better performance?
- Can Tomcat with its hundreds of NIO threads just handle more requests and is faster then Netty Event loop?
The Apps have following stacks:
Spring Web MVC:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
Tomcat (spring-boot-starter-tomcat:jar:2.4.2:compile)
RestHighLevelClient for requests to Elasticsearch
@Configuration public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration { @Value("${elasticsearch.host:localhost}") private String host; @Value("${elasticsearch.http.port:9200}") private int port; @Override @Bean public RestHighLevelClient elasticsearchClient() { final ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo(getHostAndPort()) .build(); return RestClients.create(clientConfiguration).rest(); } private String getHostAndPort(){ return host +":"+ port; } }
Controller:
@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {
PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();
return new ResponseEntity<>(dto, HttpStatus.CREATED);
}
Service:
public PersonDocumentDto indexGeneratedPersonDocument(){
PersonDocument personDocument = personGenerator.generatePersonDoc();
PersonDocumentDto personDocumentDto = new PersonDocumentDto();
try {
personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
LOGGER.debug("Document indexed!");
} catch (Exception e) {
LOGGER.error("Unable to index document!",e);
}
return personDocumentDto;
}
Spring Webflux:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<reactor-test.version>3.4.2</reactor-test.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
Netty (spring-boot-starter-reactor-netty:jar:2.4.2:compile )
ReactiveElasticSearchClient for requests to Elasticsearch
@Configuration public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration { @Value("${elasticsearch.host:localhost}") private String host; @Value("${elasticsearch.http.port:9200}") private int port; @Override @Bean public ReactiveElasticsearchClient reactiveElasticsearchClient() { ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo(getHostAndPort()) .withWebClientConfigurer(webClient -> { ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() .maxInMemorySize(-1)) .build(); String connectionProviderName = "myConnectionProvider"; int maxConnections = 1000; HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName, maxConnections)); return webClient .mutate() .clientConnector(new ReactorClientHttpConnector(httpClient)) .exchangeStrategies(exchangeStrategies) .build(); }) .build(); return ReactiveRestClients.create(clientConfiguration); } private String getHostAndPort(){ return host +":"+ port; }
}
Handler:
public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
return this.service.indexGeneratedPersonDocument()
.flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
.onErrorResume(WebClientRequestException.class, e -> ServerResponse
.badRequest()
.bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));
}
Service:
public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){
return personGenerator.generatePersonDocument()
.flatMap(this.repository::save)
.map(EntityDtoUtil::toDto)
.doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}
MVC ResponseTimesPercentiles: 500 users, 20 iterations, 10000 docs total
Webflux ResponseTimesPercentiles: 500 users, 20 iterations, 10000 docs total