Process messages concurrently in Apache.NMS.ActiveMQ

158 views Asked by At

I am using Apache.NMS.ActiveMQ 1.7.2 in a .NET 7 project. Everything works fine. I consume the messages and pass it to my message processor.

The logic like that if I send one, two, or more messages I consume the first and I pass it to the message processor and work with it and then is the second. I want now to be able to send multiple messages and everyone to be consumed at once. Like every message to have own thread and work independently!

If someone have ready example I will be very thankful.

I share the code of initialization if it helps

public class AmqClientService : IMessageProvider
{
    private readonly ILogger _logger;
    private readonly IMessageProcessor _messageProcessor;
    private readonly AmqClientSettings _settings = new AmqClientSettings();

    private IConnection connection;
    //private ISession session;
    private IMessageConsumer consumer;

    public AmqClientService(ILogger logger, IConfiguration config, IMessageProcessor messageProcessor)
    {
        _logger = logger;
        _messageProcessor = messageProcessor;
        config.Bind(nameof(AmqClientSettings), _settings);      
    }

    public async Task WaitAsync(CancellationToken stoppingToken)
    {
        var exampleProvider = "tcp://localhost:61616";

        _logger.Information($"initialize AMQ connection with provider {exampleProvider}");

        Uri connecturi = new Uri(exampleProvider);
        ConnectionFactory connectionFactory = new ConnectionFactory(connecturi);

        this.connection = connectionFactory.CreateConnection();

        using var session = connection.CreateSession();
        this.connection.Start();
        using (var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, AppSettings.QueueName)))
        {

            //IDestination destination = this.session.GetQueue(AppSettings.QueueName);

            //this.consumer = this.session.CreateConsumer(destination);

            while (!stoppingToken.IsCancellationRequested)
            {
                _logger.Information($"Waiting for SAP message");

                var message = consumer.Receive() as ITextMessage;

                _logger.Information($"Received message {message.Text}, for {message.NMSDestination}");

                if (await _messageProcessor.TryProcessAsync(message.Text))
                {
                    message.Acknowledge();
                }
            }
        }
    }
}
serviceCollection.AddMemoryCache();
serviceCollection.AddSingleton(Log.Logger);
serviceCollection.AddSingleton<IMessageProcessor, MessageProcessor>();
serviceCollection.AddSingleton<IMessageProvider, AmqClientService>();
serviceCollection.AddHostedService<Worker>();
serviceCollection.BuildServiceProvider();
1

There are 1 answers

0
mamen On

On the FAQ page of ActiveMQ it is stated:

If you want to consume concurrently from a queue, then you must use a different session for each consumer.