Azure IoT Hub trigger processing data sequentially

749 views Asked by At

I have an IoT device (ESP32) that is sending data packets to IoT Hub (free tier) every x seconds. Each packet contains an integer property PacketID, which is incremented each time a new packet is sent, starting from 1 and so on. When the data is received by IoT Hub, Azure IoT Hub trigger (hosted on the portal) is invoked and data is processed further - first inserted to CosmosDB (from StorageQueue trigger), then sent as a message to SignalR Service (from another StorageQueue trigger) and to the client web app.

The IoT Hub trigger looks like this:

    [FunctionName("FramesPacketAdd_IoTHubTrigger")]
    public async Task Run(
    [IoTHubTrigger("messages/events", Connection = "IoTHubConnectionString")] EventData message, ILogger _logger,
    [Queue("dbinsert-frames-packet-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<FramesPacket> dbInsertFramesPacketQueue,
    [Queue("eventdata-parse-error-queue", Connection = "AzureStorageAccountConnectionString")] IAsyncCollector<string> eventDataParseErrorQueue)
    {        
        try
        {
            _logger.LogInformation($"C# IoT Hub trigger function FramesPacketAdd processed a message: {Encoding.UTF8.GetString(message.Body.Array)}");

            string messageBody = Encoding.UTF8.GetString(message.Body.Array);
            var jsonObj = JsonConvert.DeserializeObject<FramesPacket>(messageBody);

            var framesPacket = new FramesPacket
            {
                PacketID = jsonObj.PacketID,
                SessionID = jsonObj.SessionID,                    
                DeviceID = jsonObj.DeviceID,
                Frames = jsonObj.Frames
            };

            await dbInsertFramesPacketQueue.AddAsync(framesPacket);
        }

        catch (Exception ex)
        {
            await eventDataParseErrorQueue.AddAsync($"[IoTHubTrigger] function [FramesPacketAdd] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");

            _logger.LogError($"[IoTHubTrigger] function [FramesPacketAdd_IoTHubTrigger] caught exception: {ex.Message} \nStackTrace: {ex.StackTrace} \nJSON: {Encoding.UTF8.GetString(message.Body.Array)}");
        }
    }

Problem is, when the packets are sent fast, say 200ms between each, the data received by the IoT Hub is way out of sequence, meaning that sometimes packets #7 and #8 are received and processed before packets #1, #2 and #3, and this is an issue for my client application, since it relies on packets being received in the same order they are sent from ESP32 chip. I tried sending each packet with 1 second delay, and the problem is still somewhat there, however it seems to only affect the first three packets - sometimes they are received in order 3, 1, 2, the rest are in the right sequence. Longer delays seems to completely remove this issue. I do believe this is because of the async nature of the function/trigger? In the end I'd like to be able to send them relatively fast, with 200-500ms delay between each.

I'm very new to Azure IoT Hub, and my question is, is there something that can be done on the portal side to ensure packets are received in the right order, or is this a limitation of such approach and such cases need to be handled after the data is received, possibly using a storage queue or service bus after the IoT Hub trigger is invoked?

I hope my question made sense, otherwise I'm more than happy to provide more details.

Thanks in advance.

2

There are 2 answers

0
Steve Busby - MSFT On

IoT Hub partitions data by deviceId (and thus all messages from a single device get written to the same underlying event hub partition in order) .. so assuming you are only using one deviceid in your ESP32 app, they should be in the order in which IoT Hub received them.

have you dumped the messages out to the azure function log to see if they are being read in the right order (i.e. maybe the database insert is letting them get out of order)

Otherwise, I'm not sure what about the plumbing of the azure functions listener (which should be using eventprocessorhost under the covers) is reading them out of order

0
kgalic On

Below are the two possible paths:

1. Use EventHub triggered Azure Function to process the messages from the Event Hub compatible endpoint of the IoT Hub.

There is a guarantee that every message from the specific device will be sent to the same partition.

Messages in the partition are ordered, so what you need is the event processor per the partition.

You can use Azure functions for this, and what Azure functions do in this case is creating a lease per partition, which makes only one Azure function instance per partition. This means that if you have 10 partitions, there will be 10 instances of Azure function processing the messages from each partition. Now what you have to do is make sure that in the Azure function, you process the message batch sequentially, for example:

[FunctionName("EventHubTrigger")]
public static async Task RunAsync([EventHubTrigger("ordered", Connection = "EventHub")] EventData[] eventDataSet, TraceWriter log)
{
    log.Info($"Triggered batch of size {eventDataSet.Length}");
    
    //processing event by event
    foreach (var eventData in eventDataSet) {
        try
        {
            // process the event here
        }
        catch
        {
            // handle event exception
        }
    }
}

Here, before the foreach loop, in the addition to the native behavior, you can also sort eventDataSet per the sequenceNumber (if you are sending it), just for the case.

Here is the blog.

2. The second way is very similar, and is about using Service Bus queues and sessions, as in the following code sample:

public async Task Run(
    [ServiceBusTrigger("queue", Connection = "ServiceBusConnectionString", IsSessionsEnabled = true)]Message message, 
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(message.Body)}");
    // process the message here
}

Here is the blog as well.

An important thing to note is that, if any of the Azure function instances(in the first case) fails, the lease will be renewed and it might process some messages twice. If this is a concern, then you should go with the second scenario.