Integrate spring-reactor into existing Spring Framework 4 STOMP Over WebSocket application

1.6k views Asked by At

My application uses the Spring Framework 4 included spring-messaging module (with key abstractions from the Spring Integration project such as Message, MessageChannel,MessageHandler and others that can serve as a foundation for such a messaging architecture.)

My application uses Websocket & STOMP. It maintains connections(websocket sessions) with a high volume of java websocket clients & one of the requirements was to use either akka or reactor.

I want to integrate spring-reactor RingBufferAsyncTaskExecutor in place of ThreadPoolTaskExecutor in clientInboundChannelExecutor & clientOutboundChannelExecutor to get better Throughput. At least I've identified this approach as the way to integrate spring-reactor into my existing application - this may not be the right approach.

I was looking at reactor-si-quickstart, since it demonstrates how to use reactor with spring integration & since spring-messaging in Spring Framework 4 includes key abstractions from the Spring Integration project. I thought it would be the closest reference.

My working java config for web socket has the following class declaration public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport implements WebSocketMessageBrokerConfigurer. WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration.

In org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration I wanted to try configure RingBufferAsyncTaskExecutor in place of ThreadPoolTaskExecutor

@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
    TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
    ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
    executor.setThreadNamePrefix("clientInboundChannel-");
    return executor;
}

When I try override this method in WebSocketConfig "The method getOrCreateTaskExecRegistration() from the type ChannelRegistration is not visible" because in AbstractMessageBrokerConfiguration it's protected ....

protected final ChannelRegistration getClientInboundChannelRegistration() {
    if (this.clientInboundChannelRegistration == null) {
        ChannelRegistration registration = new ChannelRegistration();
        configureClientInboundChannel(registration);
        this.clientInboundChannelRegistration = registration;
    }
    return this.clientInboundChannelRegistration;
}

I don't fully understand the WebSocketMessageBrokerConfigurationSupport hierarchy or the WebSocketMessageBrokerConfigurer interface in my WebSocketConfig. I just played around with overriding what I needed to for my customizations to work.

Not sure if it's relevant, but I don't need an external broker because my application doesn't send any data to all connected subscribers, at the moment and is unlikely to down the line. Communication with daemon type java websocket clients is point-to-point, but the web ui websocket in the browser does use subscribe to get real-time data so it's a convenient setup (rather then spring integration direct channel) and there where clear sources on how to set it up - still I'm not sure it is most efficient application design. STOMP Over WebSocket Messaging Architecture as described in the spring-framework reference documentation was the most comprehensive approach since this is my first spring project.

Is it possible to get the performance boosts from integrating spring-reactor into my existing application?

Or should I try to use spring integration instead, this would require a lot of modification, as far as I can tell - also it seems illogical that it would be necessary given that Spring Framework 4 included spring-messaging module was from spring integration.

How can should I integrate spring-reactor into my standard spring framework 4 STOMP Over WebSocket Messaging Architecture?

If configuring RingBufferAsyncTaskExecutor in place of ThreadPoolTaskExecutor in clientInboundChannelExecutor & clientOutboundChannelExecutor is the correct way, how should I go about doing this?

1

There are 1 answers

1
Artem Bilan On BEST ANSWER

Actually the RingBufferAsyncTaskExecutor isn't ThreadPoolTaskExecutor, so you can't use it that way.

You can simply override clientInbound(Outbound)Channel beans from your AbstractWebSocketMessageBrokerConfigurer impl and just use @EnableWebSocketMessageBroker:

@Configuration
@EnableWebSocketMessageBroker
@EnableReactor
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @autowired
    Environment reactorEnv;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry configurer) {
        configurer.setApplicationDestinationPrefixes("/app");
        configurer.enableSimpleBroker("/topic", "/queue");
    }

    @Bean
    public AbstractSubscribableChannel clientInboundChannel() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(new RingBufferAsyncTaskExecutor(this.reactorEnv));
        ChannelRegistration reg = getClientOutboundChannelRegistration();
        channel.setInterceptors(reg.getInterceptors());
        return channel;
    }

}

And pay attention, please, to the WebSocket support in the Spring Integration.

By the way: point me out, please, to the link for that reactor-si-quickstart.