Multithread using Executor and WebSocket

2.2k views Asked by At

In my case, I want to create multiple thread executor to handle trap received. In same application, I want to implement websocket to make my application running real time.

I have configuration class to create ThreadPoolExecutor, like this

@Configuration
@EnableAsync
@EnableScheduling
@Profile("!" + Constants.SPRING_PROFILE_FAST)
public class AsyncConfiguration implements AsyncConfigurer, EnvironmentAware {
 .......
 @Override
    @Bean
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(propertyResolver.getProperty("corePoolSize", Integer.class, 30));
        executor.setMaxPoolSize(propertyResolver.getProperty("maxPoolSize", Integer.class, 150));
        executor.setQueueCapacity(propertyResolver.getProperty("queueCapacity", Integer.class, 10000));
        executor.setThreadNamePrefix("ems-Executor-");
        return new ExceptionHandlingAsyncTaskExecutor(executor);
    }

Then I use the executor in my TrapReceiver class,

@Component
public class TrapReceiver extends Thread implements CommandResponder {

    @Inject
    private ApplicationContext applicationContext;

    @Inject
    private Executor executor;

    public TrapReceiver(){
    }

    List<PDUv1> listPdu = new ArrayList<PDUv1>();
    String message = "";
    long totReceivedTrap = 0;

    @PostConstruct
    public void init() {
        //create thread pool untuk memanage thread puller (thread yang pull dan save device oid value)
        System.out.println("Running trap listener");
        this.start();
    }

    public synchronized void processPdu(CommandResponderEvent cmdRespEvent) {
        PDUv1 pdu = (PDUv1) cmdRespEvent.getPDU();
        listPdu.add(pdu);
        if (pdu != null) {
            if(listPdu.size() == 3){ //3trap per thread
                List<PDUv1> temp = new ArrayList<PDUv1>();
                temp.addAll(listPdu);
                TrapInsertor trapInsertor = (TrapInsertor) applicationContext.getBean("trapInsertor");
                trapInsertor.setProperty(temp);
                executor.execute(trapInsertor);
                listPdu.clear();
            }
        }
        totReceivedTrap++;
        if(totReceivedTrap % 10000 == 0)
            System.out.println("total received trap "+totReceivedTrap);
    }

    public void run() {
        while (true) {
            try {
                this.listen(new UdpAddress(getIp()+"/162")); //where to listen
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }

This code run well, but when I try to add websocket feature in my application, the application got error. The error come when I use @EnableWebSocketMessageBroker annotation. This is my websocket configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketAppConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

And this is the error

Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private java.util.concurrent.Executor com.satunol.ems.snmp.TrapReceiver.executor; nested exception is org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type [java.util.concurrent.Executor] is defined: expected single matching bean but found 5: getAsyncExecutor,messageBrokerSockJsTaskScheduler,clientInboundChannelExecutor,clientOutboundChannelExecutor,brokerChannelExecutor
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:555)
    at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87)
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331)
    ... 16 more
Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type [java.util.concurrent.Executor] is defined: expected single matching bean but found 5: getAsyncExecutor,messageBrokerSockJsTaskScheduler,clientInboundChannelExecutor

What is wrong with my code, if I wrong in websocket configuration or in threadpool, how it should be.

2

There are 2 answers

0
user3432319 On

The problem here is since there are multiple possible bean for [java.util.concurrent.Executor], Spring is unable to pick the correct one. Spring autowire mechanism is based by default on type. Since WebSocket has its own ThreadPoolTaskExecutor implementations you ended with 5 possible beans.

The autowire mechanism is based on the assumption that you'll give a single matching bean without any other annotation. Since there are multiple matching beans, you need to tell Spring the one you want to autowire. You can achieve it with the annotation @Qualifier("beanName")

@Autowired
@Qualifier("getAsyncExecutor")
private Executor executor;

Hope this help !

Some example: Example of @Qualifier annotation

0
hagrawal7777 On

This neither issue with multi-threading not with web-socket, this problem with Spring and its auto wiring feature. See the error -

Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private java.util.concurrent.Executor com.satunol.ems.snmp.TrapReceiver.executor; nested exception is org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type [java.util.concurrent.Executor] is defined: expected single matching bean but found 5: getAsyncExecutor,messageBrokerSockJsTaskScheduler,clientInboundChannelExecutor,clientOutboundChannelExecutor,brokerChannelExecutor

My recommendation: Remove threadpool and threadpoolexecutor tags and add spring and spring-annotations tag, and some Spring folk should be able to help you quickly.