Consume service bus message after x time

86 views Asked by At

I'm doing a project to study more of brokers and i'm using .NET 6.0 and Service bus to do this.

My question is: I need to publish a message and consume the same message only after a few time (i put the time in an enviroment variable to control it), how can i do this?

I know, it's not a good practice doing this, "empty queue is a healthy queue", but it's a personal project.

1

There are 1 answers

0
RithwikBojja On BEST ANSWER

Consume service bus message after x time

I do agree with discussion in comments , You can also use blow code to do the same:

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;

class Program
{
    const string rithcon = "Endpoint=sb://siln-servicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=WFsJyG1O3qA=";
    const string rithqueuename = "rithwik";
    static IQueueClient rithqc; 

    static async Task Main(string[] args)
    {
        rithqc = new QueueClient(rithcon, rithqueuename);
        await RithSendMessageAsync("Hello Rithwik Bojja!");
        await Task.Delay(TimeSpan.FromMinutes(1));
        await RithReceiveMessageAsync();
    }

    static async Task RithSendMessageAsync(string rithmessageBody)
    {
        try
        {
            var message = new Message(Encoding.UTF8.GetBytes(rithmessageBody))
            {
                ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(1)
            };
            await rithqc.SendAsync(message);
            Console.WriteLine($"Hey Man the Message sent: {rithmessageBody}");
        }
        finally
        {
        }
    }

    static async Task RithReceiveMessageAsync()
    {
        try
        {
            rithqc.RegisterMessageHandler(ProcessMessagesAsync, new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            });

            await Task.Delay(TimeSpan.FromMinutes(5));
        }
        finally
        {
            await rithqc.CloseAsync();
        }
    }

    static async Task ProcessMessagesAsync(Message rithmessage, CancellationToken token)
    {
        Console.WriteLine($"Received message: {Encoding.UTF8.GetString(rithmessage.Body)}");

        await rithqc.CompleteAsync(rithmessage.SystemProperties.LockToken);
    }
    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
        return Task.CompletedTask;
    }
}

In my code i have kept 1 min time to consume the message.

Output:

After sending message:

enter image description here

enter image description here

After receiving and consuming the message:

enter image description here

enter image description here