Handle event after SessionIdleTimeout passed on Azure service bus topic subscription

179 views Asked by At

I'm using Azure service bus topic with C# and ASP.NET Core. I have created session enabled subscription and I'm using ServiceBusSessionProcessor to receive messages from this topic.

I would like to set SessionIdleTimeout to 15 seconds and handle an event after this time has passed. If there is no message received I would like to update a record in my database and stop processing.

Is this possible? Is there anyway I can know that there was no message received in those 15 seconds?

Some of the code:

public BaseSessionMessageProcessor(
        IMessageHandler<T> handler,
        string topicName,
        string subscriptionName,
        string sessionId,
        IServiceBusPersisterConnection serviceBusPersisterConnection,
        IMessageHelper messageHelper,
        IAppLogger<T> logger)
    {
        _handler = handler;
        _messageHelper = messageHelper;
        _logger = logger;

        var options = new ServiceBusSessionProcessorOptions
        {
            MaxConcurrentSessions = 2,
            MaxConcurrentCallsPerSession = 1,
            AutoCompleteMessages = false,
            SessionIds = { sessionId },
            SessionIdleTimeout = TimeSpan.FromSeconds(15)
        };
        _processor = serviceBusPersisterConnection.ServiceBusClient.CreateSessionProcessor(topicName, subscriptionName, options);

        RegisterSessionMessageProcessor().GetAwaiter().GetResult();
    }

    public async Task RegisterSessionMessageProcessor()
    {
        _processor.ProcessMessageAsync += SessionMessageHandler;
        _processor.ProcessErrorAsync += ErrorHandler;

        await _processor.StartProcessingAsync();
    }

    public async Task SessionMessageHandler(ProcessSessionMessageEventArgs args)
    {
        _logger.LogInformation($"log-info: process started");
    }

    public Task ErrorHandler(ProcessErrorEventArgs args)
    {
        var exception = args.Exception;
        var context = args.ErrorSource;

        _logger.LogError($"log-exception: An error occurred while trying to handle a message.", exception, context);

        return Task.CompletedTask;
    }
1

There are 1 answers

8
Sampath On BEST ANSWER

The SessionIdleTimeout property in the ServiceBusSessionProcessorOptions class here.

  • Gets or sets the maximum amount of time to wait for a message to be received for the currently active session.
  • After this time has elapsed, the processor will close the session and attempt to process another session.
  • If SessionIds is populated and MaxConcurrentSessions is greater or equal to the number of sessions specified in SessionIds, the session will not be closed when the idle timeout elapses.
  • The below sample code uses the Azure Service Bus SDK for .NET to create a ServiceBusSessionProcessor with the specified SessionIdleTimeout property. It defines the maximum amount of time the processor will wait for a message to be received for the currently active session. If no message is received within this time frame, the processor will close the session and attempt to process another session.
    // Create a session processor with SessionIdleTimeout
    var options = new ServiceBusSessionProcessorOptions
    {
        SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
    };

    var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);

    // Configure the message handler
    processor.ProcessMessageAsync += MessageHandler;
    processor.ProcessErrorAsync += ErrorHandler;

    // Start processing sessions
    await processor.StartProcessingAsync();

    Console.WriteLine("Press any key to stop processing...");
    Console.ReadKey();

    // Stop processing sessions
    await processor.StopProcessingAsync();
}

static async Task MessageHandler(ProcessSessionMessageEventArgs args)
{
    // Process the received message
    var message = args.Message;
    Console.WriteLine($"Received message: {message.Body} from session: {args.SessionId}");

    // Complete the message to remove it from the session
    await args.CompleteMessageAsync(message);
}

static Task ErrorHandler(ProcessErrorEventArgs args)
{
    // Handle errors
    Console.WriteLine($"Error: {args.Exception.Message}");
    return Task.CompletedTask;
}



Output:

enter image description here

Listen for a specific session with processing:

static async Task CreateSubscriptionWithSessionAsync(string connectionString, string topicName, string subscriptionName)
{
    var client = new ServiceBusAdministrationClient(connectionString);

    if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
    {
        var options = new CreateSubscriptionOptions(topicName, subscriptionName)
        {
            RequiresSession = true // Enable session support
        };
        await client.CreateSubscriptionAsync(options);
    }
}

static async Task ProcessMessagesWithSessionAsync(string connectionString, string topicName, string subscriptionName, string sessionId)
{
    var client = new ServiceBusClient(connectionString);

    var options = new ServiceBusSessionProcessorOptions
    {
        SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
    };

    var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
    processor.ProcessMessageAsync += (args) => MessageHandler(args, sessionId);
    processor.ProcessErrorAsync += ErrorHandler;

    Console.WriteLine($"Start processing messages from session '{sessionId}' in subscription '{subscriptionName}'...");

    await processor.StartProcessingAsync();

    Console.WriteLine("Press any key to stop processing...");
    Console.ReadKey();

    await processor.StopProcessingAsync();
    await processor.CloseAsync();
    await client.DisposeAsync();
}

static async Task MessageHandler(ProcessSessionMessageEventArgs args, string expectedSessionId)
{
    if (args.SessionId == expectedSessionId)
    {
        var message = args.Message;
        Console.WriteLine($"Received message from session '{args.SessionId}': {Encoding.UTF8.GetString(message.Body)}");
        await args.CompleteMessageAsync(message);
    }
}

static Task ErrorHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine($"Error: {args.Exception.Message}");
    return Task.CompletedTask;
}


enter image description here

  • The code below processes messages from a specific session with a timer to stop processing if the session is idle for a specified period.
    static async Task CreateSubscriptionWithSessionAsync(string connectionString, string topicName, string subscriptionName)
    {
        var client = new ServiceBusAdministrationClient(connectionString);

        if (!await client.SubscriptionExistsAsync(topicName, subscriptionName))
        {
            var options = new CreateSubscriptionOptions(topicName, subscriptionName)
            {
                RequiresSession = true // Enable session support
            };
            await client.CreateSubscriptionAsync(options);
        }
    }

    static async Task ProcessMessagesWithSessionAsync(string connectionString, string topicName, string subscriptionName, string sessionId)
    {
        var client = new ServiceBusClient(connectionString);

        var options = new ServiceBusSessionProcessorOptions
        {
            SessionIdleTimeout = TimeSpan.FromMinutes(5) // Set the session idle timeout to 5 minutes
        };

        // Change the Timer initialization
        sessionIdleTimer = new Timer(state => StopProcessingIfIdle(), null, Timeout.Infinite, Timeout.Infinite);

        var processor = client.CreateSessionProcessor(topicName, subscriptionName, options);
        processor.ProcessMessageAsync += (args) => MessageHandler(args, sessionId);
        processor.ProcessErrorAsync += ErrorHandler;

        Console.WriteLine($"Start processing messages from session '{sessionId}' in subscription '{subscriptionName}'...");

        await processor.StartProcessingAsync();

        Console.WriteLine("Press any key to stop processing...");
        Console.ReadKey();

        await processor.StopProcessingAsync();
        await processor.CloseAsync();
        await client.DisposeAsync();
    }

    static async Task MessageHandler(ProcessSessionMessageEventArgs args, string expectedSessionId)
    {
        if (args.SessionId == expectedSessionId)
        {
            var message = args.Message;
            Console.WriteLine($"Received message from session '{args.SessionId}': {Encoding.UTF8.GetString(message.Body)}");

            // Reset the timer upon receiving a message
            sessionIdleTimer.Change(TimeSpan.FromMinutes(5), Timeout.InfiniteTimeSpan);

            await args.CompleteMessageAsync(message);
        }
    }

    static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Error: {args.Exception.Message}");
        return Task.CompletedTask;
    }

    static void StopProcessingIfIdle()
    {
        // Stop processing if the session is idle for the specified timeout
        Console.WriteLine($"Session is idle. Stopping processing...");
        sessionIdleTimer.Change(Timeout.Infinite, Timeout.Infinite);
        // Stop the processing here (you may need to set a flag or trigger another mechanism)
        // For example: Environment.Exit(0);
    }
}

enter image description here