ServiceBusTrigger: session receiving one message at a time

1.5k views Asked by At

I have an Azure Function with the ServiceBusTrigger trigger

it's configured to receive sessions, and when I insert the messages, it's inserting with the correct "SessionId".

But when executing the trigger it executes only one session message at a time.

Can someone help me? I want to run all session messages at a time.

I leave below code snippets regarding the insertion of messages and trigger

ServiceBusTrigger:

public async Task RunAsync([ServiceBusTrigger("%" + AzureServiceBusConfiguration.ServiceBusBlobMigrationQueueNameSecretName + "%", Connection = AzureServiceBusConfiguration.ServiceBusConnectionSecretName, IsSessionsEnabled = true)] string messageBrokerMessageString, FunctionContext context)
{
//CODE
}

Message Insertion:

    for (int chunkIndex = 0; chunkIndex < chunks.Count(); chunkIndex++)
    {
        string workerNodeName = _workerManager.GetWorkerNodeName(controlMetadataEntity.GetBlobMigrationStrategyIdentifier(), Convert.ToUInt32(chunkIndex), controlMetadataEntity.RowKey);

        foreach (BlobMetadataEntity blob in chunks.ElementAt(chunkIndex))
        {
            ServiceBusMessage message = new ServiceBusMessage()
            {
                // The message ID is necessary to prevent duplicate entries in message broker.
                // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/duplicate-detection.
                MessageId = string.Join('|', blob.PartitionKey, blob.RowKey),

                // Body will be the JSON of a object converted to bytes.
                Body = new BinaryData(Encoding.UTF8.GetBytes(new BlobExecutionQueueMessage(blob.PartitionKey, blob.RowKey, workerNodeName, null).GetObjectAsJsonAsync())),
                ContentType = MediaTypeNames.Application.Json,

                // This will separate the message in logic queues to control the parallelism per execution.
                // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions.
                SessionId = workerNodeName
            };

            // Set user properties to identify this blob in message broker.
            message.ApplicationProperties.Add("ExecutionId", blob.PartitionKey);
            message.ApplicationProperties.Add("BlobId", blob.RowKey);

            // Add the "AddAsync" task to the list of tasks to run them after all
            // messages were enqueued in the list.
            messageBrokerInsertions.Add(_azureServiceBusClient.SendMessageAsync(serviceBusClient, sender, message));
        }
    }

My Host.json

{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingExcludedTypes": "Request",
            "samplingSettings": {
                "isEnabled": true
            }
        },
        "logLevel": {
            "Default": "Information"
        }
    },
    "extensions": {
        "durableTask": {
            "maxConcurrentActivityFunctions": 500,
            "maxConcurrentOrchestratorFunctions": 500
        },
        "serviceBus": {
            "sessionHandlerOptions": {
                "maxConcurrentSessions": 1
            }
        }
    }
}
1

There are 1 answers

0
Tspring On BEST ANSWER

This is a bit tricky as far as I know this is by design to respect order for the sessions in place in relation to the consumers. You could increase the number of->
"maxConcurrentSessions": 16

Which will run them in parallel. Additionally, if you are on a consumption plan you don't have to worry about scaling and it takes queue length into factoring scale. If you don't care about ordering you could switch to non session enabled and bring in the messages in batches. Please note the line here:
https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-powershell#hostjson-settings


"When you set the isSessionsEnabled property or attribute on the trigger to true, the sessionHandlerOptions is honored. When you set the isSessionsEnabled property or attribute on the trigger to false, the messageHandlerOptions is honored."

Leaving some reference links just in case to read up:
//queues and topics
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions


//maxconcurrentsessions
https://learn.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.sessionhandleroptions.maxconcurrentsessions?view=azure-dotnet#microsoft-servicebus-messaging-sessionhandleroptions-maxconcurrentsessions