How to use JMS to knows when the last message of a list has been processed

101 views Asked by At

1500 records that I'm breaking up with asynchronous processing with JMS into smaller groups (~250).

1500 too is not a fixed value though. For each client can be more or less. In some cases there can be a 8000 products, or more. I will have N clients doing this operation one, two, three, or four times per day.

I have been breaking the records into smaller groups to avoid having a transaction with 1500 records.

I need to start some task only when all parts have been processed (all 1500).

How can I do this? I'm using Spring 4, JMS 2, HornetQ, and for now using config by annotations.

Maybe I'm not doing the right thing using JMS for that problem. I need help with that too. I have an XML file (from a webservice) with 1500 products (code, price, stock, stock_local, title) and I have to persist all of them.

After, and only after all of them are processed I need to start the task that will update Stock and Price values of each (into a remote system), based on the newly stored values (along with some other conditions)

The code:

// in some RestController i have
Lists.partition(newProducts, 250).forEach(listPart->
    myQueue.add(createMessage(Lists.newArrayList(listPart))));

//called some times. Each message contains a list of 250 products to persist
public void add(ProductsMessage message) {
    this.jmsTemplate.send(QUEUE_NAME, session -> session.createObjectMessage(message));
}

@JmsListener(destination = QUEUE_NAME, )
public void importProducts(ProductsMessage message) { 
.... 
//at this method i get message.getList and persist all 250 products
}

Actual config JMS:

@Configuration
@EnableJms
public class JmsConfig {

public static final int DELIVERY_DELAY = 1000;
public static final int SESSION_CACHE_SIZE = 10;

@Bean
@Autowired
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(PlatformTransactionManager transactionManager) {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setDestinationResolver(destinationResolver());
    factory.setConcurrency("1-2");
    factory.setTransactionManager(transactionManager);


    return factory;
}

@Bean
public DestinationResolver destinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ConnectionFactory connectionFactory() {

    TransportConfiguration transport = new TransportConfiguration(InVMConnectorFactory.class.getName());
    ConnectionFactory originalConnectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transport);

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setTargetConnectionFactory(originalConnectionFactory);
    connectionFactory.setSessionCacheSize(SESSION_CACHE_SIZE);

    return connectionFactory;
}

@Bean
public JmsTemplate template(ConnectionFactory connectionFactory) {
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory);
    template.setDeliveryDelay(DELIVERY_DELAY);
    template.setSessionTransacted(true);
    return template;
}

/**
 * Inicializa um broker JMS embarcado
 */
@Bean(initMethod = "start", destroyMethod = "stop")
public EmbeddedJMS startJmsBroker() {
    return new EmbeddedJMS();
}

}

0

There are 0 answers