In my Java Spring boot application, I am using reactive libraries to connect and stream from a websocket. I have the following dependencies in my pom.xml
file
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.8</version>
<scope>test</scope>
</dependency>
My production code to open a websocket connection looks like
@Autowired private ReactorNettyWebSocketClient client;
public <T> Flux<T> connectToWebSocketAndParseMessages(
String websocketUrl, Function<String, T> deserializer) {
return Flux.create(
sink -> {
try {
client
.execute(new URI(websocketUrl), getWebSocketHandler(deserializer, sink))
.retryWhen(getRetrySpec())
.subscribe(); // Subscribe to start the WebSocket connection
} catch (URISyntaxException e) {
sink.error(e);
}
});
}
@NotNull
private <T> WebSocketHandler getWebSocketHandler(
Function<String, T> deserializer, FluxSink<T> sink) {
return session -> {
Flux<WebSocketMessage> messageFlux = session.receive();
return messageFlux
.map(
message -> {
String messagePayload = message.getPayloadAsText();
try {
T model = deserializer.apply(messagePayload);
sink.next(model); // Emit the parsed model
} catch (Exception e) {
sink.error(e); // Handle parsing errors by signaling an error
}
return messagePayload;
})
.then();
};
}
Now, I am trying to unit test the method connectToWebSocketAndParseMessages
and for this, I mock the ReactorNettyWebSocketClient
and the session created by the client. My test case looks like
@SpringBootTest
@TestPropertySource(locations = "classpath:test.properties")
class WebSocketGatewayTest {
@Autowired private WebSocketGateway webSocketGateway;
@MockBean private ReactorNettyWebSocketClient mockClient;
@Test
public void testConnectToWebSocketAndParseMessages1() {
// Given
Function<String, Integer> deserializer = Integer::parseInt;
String websocketUrl = "ws://example.com";
WebSocketSession session = mock(WebSocketSession.class);
WebSocketMessage message1 = mock(WebSocketMessage.class);
WebSocketMessage message2 = mock(WebSocketMessage.class);
when(mockClient.execute(any(URI.class), any()))
.thenReturn(Flux.just(session).then())
.thenAnswer(
invocation -> {
WebSocketHandler handler = invocation.getArgument(1);
handler.handle(session); // Simulate WebSocket session handling
return Mono.empty();
});
when(session.receive()).thenReturn(Flux.just(message1, message2));
when(message1.getPayloadAsText()).thenReturn("42");
when(message2.getPayloadAsText()).thenReturn("100");
// When
Flux<Integer> result =
webSocketGateway.connectToWebSocketAndParseMessages(websocketUrl, deserializer);
// Then
StepVerifier.create(result).expectNext(42).expectNext(100).expectComplete().verify();
}
// remaining tests
Now, once the test runs, it does not complete and keeps on waiting for the message to be received from the websocket. The mock when(mockClient.execute(any(URI.class), any()))
somehow does not send any message to the websocket and this causes the test to not complete.
Any ideas here as to how this test can be modified to test the behaviour of reactive websocket connection. Thanks