1500 records that I'm breaking up with asynchronous processing with JMS into smaller groups (~250).
1500 too is not a fixed value though. For each client can be more or less. In some cases there can be a 8000 products, or more. I will have N clients doing this operation one, two, three, or four times per day.
I have been breaking the records into smaller groups to avoid having a transaction with 1500 records.
I need to start some task only when all parts have been processed (all 1500).
How can I do this? I'm using Spring 4, JMS 2, HornetQ, and for now using config by annotations.
Maybe I'm not doing the right thing using JMS for that problem. I need help with that too. I have an XML file (from a webservice) with 1500 products (code, price, stock, stock_local, title) and I have to persist all of them.
After, and only after all of them are processed I need to start the task that will update Stock and Price values of each (into a remote system), based on the newly stored values (along with some other conditions)
The code:
// in some RestController i have
Lists.partition(newProducts, 250).forEach(listPart->
myQueue.add(createMessage(Lists.newArrayList(listPart))));
//called some times. Each message contains a list of 250 products to persist
public void add(ProductsMessage message) {
this.jmsTemplate.send(QUEUE_NAME, session -> session.createObjectMessage(message));
}
@JmsListener(destination = QUEUE_NAME, )
public void importProducts(ProductsMessage message) {
....
//at this method i get message.getList and persist all 250 products
}
Actual config JMS:
@Configuration
@EnableJms
public class JmsConfig {
public static final int DELIVERY_DELAY = 1000;
public static final int SESSION_CACHE_SIZE = 10;
@Bean
@Autowired
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(PlatformTransactionManager transactionManager) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setConcurrency("1-2");
factory.setTransactionManager(transactionManager);
return factory;
}
@Bean
public DestinationResolver destinationResolver() {
return new DynamicDestinationResolver();
}
@Bean
public ConnectionFactory connectionFactory() {
TransportConfiguration transport = new TransportConfiguration(InVMConnectorFactory.class.getName());
ConnectionFactory originalConnectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transport);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setTargetConnectionFactory(originalConnectionFactory);
connectionFactory.setSessionCacheSize(SESSION_CACHE_SIZE);
return connectionFactory;
}
@Bean
public JmsTemplate template(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory);
template.setDeliveryDelay(DELIVERY_DELAY);
template.setSessionTransacted(true);
return template;
}
/**
* Inicializa um broker JMS embarcado
*/
@Bean(initMethod = "start", destroyMethod = "stop")
public EmbeddedJMS startJmsBroker() {
return new EmbeddedJMS();
}
}