Gracefully postpone QueueTrigger

520 views Asked by At

I'am currently using the WebJobs SDK to consume messages issued in queue.

My method as one parameter with the [Microsoft.Azure.WebJobs.QueueTrigger(...)] attribute and is triggered alright. In some circumstances, the method can process the message but from time to time, I would prefer that it reject the message until a critical resource becomes available.

I tryed to throw an exception in this case, but contrary to what the reference says, the queue trigger is fired again immediately (apparently not waiting the lease time).

Is there a way to gracefuly postpone the message handling ? Would it be safe just to freeze the thread waiting for critical resource ?

Any hint would be much appreciated.

3

There are 3 answers

2
b2zw2a On

I don't think you can postpone a message in current version.

Possible workaround

You can re-add the same message with delay and to avoid duplicates you can set your MaxDequeueCount to 1, this will send failing message straight to poison queue after exception:

        JobHostConfiguration configuration = new JobHostConfiguration();
        configuration.Queues.MaxDequeueCount = 1;

and message processor - re-add your message with delay and throw exception:

    public static void ProcessMessage([QueueTrigger("resource-heavy-queue")] string message, [Queue("resource-heavy-queue")]  CloudQueue originalQueue)
    {
        if ( /*Resource unavaliable*/)
        {
            var messageToReAdd = new CloudQueueMessage(message);
            originalQueue.AddMessage(messageToReAdd, null, TimeSpan.FromSeconds(10));
            throw new ResourcesNotAvailableException();
        }
    }

This way you can implement some kind of back-off strategy for your resource. Unfortunately you have to manually deal with some problems:

  • Handling poison messages - if you keep re-adding the same message you can end up in infinite loop, so you have to extend your message model to carry NumberOfRetries and increment it every time you re-add it
  • Message Id and InsertionTime will be different after every re-add so you can't rely on them.
1
Victor Hurdugaci On

Sorry, it is not possible to postpone a trigger. What you want is a multitrigger (trigger when the message AND the resource are available) that the Azure WebJobs SDK doesn't have

There are a few workarounds. You could freeze the thread if all messages in that queue require the critical resources - you are implementing a semaphore - otherwise, it gets tricky to schedule because the SDK will not process new messages if you reach the maximum number of functions running in parallel.

What I would do instead of triggering on the queue message is trigger on the critical resource. When the resource is available, it puts a message in another queue and, only then, check to see if there are any messages to process that require the resource.

0
D. Siemer On

What I am doing in this situation is to have an ICollector for the same Queue I am triggered from like

public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)

(the %QueueName% notation makes the SDK fetch the value from the app.config)

Then in the handler I am doing something like

if (needToWait)
{
    var delayedMessage = new BrokeredMessage(originalMessageBody) { Label = originalLabel, MessageId = originalMessageId, ScheduledEnqueueTimeUtc = DateTime.UtcNow + this.ExecutionDelay };
    queue.Add(delayedMessage);
    return;
}

That way the current message gets completed, but a new one with the same properties is scheduled to be delivered after the defined timeout.

No threads are blocked in the making of this program.