AzureServiceBus: Process all messages of the same session and rollback if one fails

62 views Asked by At

I have following process, that I thought would be easy to implement with the AzureServiceBus and sessions: I have a subscription that creates +5 messages out of one message and sends all those messages via batch to another subscription that enables sessions. Therefore I set the same session id for those +5 messages. Sending the messages is not a problem. But the next step is: I thought I could create a processor, that would get me all messages with the same session id. Each message calls a service that could possibly fail. If one call fails, all the other messages that were completed in that session before would have to be rollbacked. But I guess that's not how sessions work in the AzureServiceBus.

I started with following code (that's inside the method StartAsync in a processor class that implements the IHostedService):

[...]
sessionProcessor = serviceBusClient.CreateSessionProcessor(
    processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic,
    processorOptions.Subscription,
    options);

// Configure the message and error handler to use
sessionProcessor.ProcessMessageAsync += MessageHandler;
sessionProcessor.ProcessErrorAsync += ErrorHandler;

sessionProcessor.SessionInitializingAsync += SessionInitializingHandler;
sessionProcessor.SessionClosingAsync += SessionClosingHandler;

await sessionProcessor.StartProcessingAsync();
[...]

Problem is that the MessageHandler only processes one message and completes it right away. That's not what I need. So I tried that here:

var receiver = await serviceBusClient.AcceptNextSessionAsync(processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic, processorOptions.Subscription);

var messages = await receiver.ReceiveMessagesAsync(100);

using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
    foreach (var message in messages)
    {
        await ProcessSessionRequest(message);

        await receiver.CompleteMessageAsync(message);
        await receiver.SetSessionStateAsync(new BinaryData(SessionState.SessionInProcess));
    }

    ts.Complete();
}

But here I have no error handling like the session processor would have. Which means: If one message fails and throws an exception, my processor completely shuts down instead of just trying again or moving the messages into the dead-letter queue and moving on to another session.

Does anyone have any idea how to implement that correctly? The only other idea I have is writing my own processor with the necessary events. But maybe I'm just missing something.

1

There are 1 answers

0
Suresh Chikkam On

Thanks to @Sean Feldman. Yes, the approach which you outlined will not handle the scenario whereas the maximum number of messages per session is not received in a single operation. Additionally, ensuring atomic processing of messages within a session, especially in the case of failures, requires more consideration.

  • You’ll need to loop until you’ve received all messages within the session. Keep fetching messages until you’ve accumulated the expected count or until no more messages are available.

  • Send messages to the Azure Service Bus topic, ensuring that messages with related content are grouped under the same session ID.

using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;
using System.Transactions;

class Program
{
    static async Task Main(string[] args)
    {
        string connectionString = "<your_connection_string>";
        string topicName = "<your_topic_name>";
        string subscriptionName = "<your_subscription_name>";

        await ProcessMessages(connectionString, topicName, subscriptionName);
    }

    static async Task ProcessMessages(string connectionString, string topicName, string subscriptionName)
    {
        await using var client = new ServiceBusClient(connectionString);
        var processor = client.CreateSessionProcessor(topicName, subscriptionName);

        processor.ProcessMessageAsync += async args =>
        {
            var sessionReceiver = args.Session;
            var messages = await sessionReceiver.ReceiveMessagesAsync(10); // Maximum of 10 messages per session
            using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                foreach (var message in messages)
                {
                    try
                    {
                        // Process your message here
                        Console.WriteLine($"Processing message: {message.MessageId}");
                        await Task.Delay(1000); // Simulate processing
                        await sessionReceiver.CompleteMessageAsync(message);
                    }
                    catch (Exception ex)
                    {
                        // Log the exception
                        Console.WriteLine($"Error processing message {message.MessageId}: {ex.Message}");

                        // Rollback the transaction
                        ts.Dispose();

                        // Abandon the message
                        await sessionReceiver.AbandonMessageAsync(message);
                    }
                }
                ts.Complete(); // Commit the transaction if all messages are processed successfully
            }
        };

        processor.ProcessErrorAsync += args =>
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        };

        await processor.StartProcessingAsync();
        Console.WriteLine("Press any key to stop processing...");
        Console.ReadKey();
        await processor.StopProcessingAsync();
    }
}
  • Introduce failures in message processing logic to observe how the system handles failures and rollbacks.

When a message fails (e.g., due to an exception), you’ll need to roll back all previously completed messages within the same session. This is where it gets tricky. Move the failed message to a dead-letter queue (DLQ) for further analysis. However, this doesn’t automatically affect other completed messages.

Received messages:

enter image description here