In my Java Spring boot application, I am using reactive libraries to connect and stream from a websocket. I have the following dependencies in my pom.xml file
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <version>3.4.8</version>
            <scope>test</scope>
        </dependency>
My production code to open a websocket connection looks like
@Autowired private ReactorNettyWebSocketClient client;
 public <T> Flux<T> connectToWebSocketAndParseMessages(
      String websocketUrl, Function<String, T> deserializer) {
    return Flux.create(
        sink -> {
          try {
            client
                .execute(new URI(websocketUrl), getWebSocketHandler(deserializer, sink))
                .retryWhen(getRetrySpec())
                .subscribe(); // Subscribe to start the WebSocket connection
          } catch (URISyntaxException e) {
            sink.error(e);
          }
        });
  }
  @NotNull
  private <T> WebSocketHandler getWebSocketHandler(
      Function<String, T> deserializer, FluxSink<T> sink) {
    return session -> {
      Flux<WebSocketMessage> messageFlux = session.receive();
      return messageFlux
          .map(
              message -> {
                String messagePayload = message.getPayloadAsText();
                try {
                  T model = deserializer.apply(messagePayload);
                  sink.next(model); // Emit the parsed model
                } catch (Exception e) {
                  sink.error(e); // Handle parsing errors by signaling an error
                }
                return messagePayload;
              })
          .then();
    };
  }
Now, I am trying to unit test the method connectToWebSocketAndParseMessages and for this, I mock the ReactorNettyWebSocketClient and the session created by the client. My test case looks like
@SpringBootTest
@TestPropertySource(locations = "classpath:test.properties")
class WebSocketGatewayTest {
  @Autowired private WebSocketGateway webSocketGateway;
  @MockBean private ReactorNettyWebSocketClient mockClient;
  @Test
  public void testConnectToWebSocketAndParseMessages1() {
    // Given
    Function<String, Integer> deserializer = Integer::parseInt;
    String websocketUrl = "ws://example.com";
    WebSocketSession session = mock(WebSocketSession.class);
    WebSocketMessage message1 = mock(WebSocketMessage.class);
    WebSocketMessage message2 = mock(WebSocketMessage.class);
    when(mockClient.execute(any(URI.class), any()))
        .thenReturn(Flux.just(session).then())
        .thenAnswer(
            invocation -> {
              WebSocketHandler handler = invocation.getArgument(1);
              handler.handle(session); // Simulate WebSocket session handling
              return Mono.empty();
            });
    when(session.receive()).thenReturn(Flux.just(message1, message2));
    when(message1.getPayloadAsText()).thenReturn("42");
    when(message2.getPayloadAsText()).thenReturn("100");
    // When
    Flux<Integer> result =
        webSocketGateway.connectToWebSocketAndParseMessages(websocketUrl, deserializer);
    // Then
    StepVerifier.create(result).expectNext(42).expectNext(100).expectComplete().verify();
  }
// remaining tests
Now, once the test runs, it does not complete and keeps on waiting for the message to be received from the websocket. The mock when(mockClient.execute(any(URI.class), any())) somehow does not send any message to the websocket and this causes the test to not complete.
Any ideas here as to how this test can be modified to test the behaviour of reactive websocket connection. Thanks