Timeouts in Azure Functions when populating queues

698 views Asked by At

We have a simple ETL process to extract data from an API to a Document DB which we would like to implement using functions. In brief, the process is to take a ~16,500 line file, extract an ID from each line (Function 1), build a URL for each ID (Function 2), hit an API using the URL (Function 3), store the response in a document DB (Function 4). We are using queues for inter-function communication and are seeing problems with timeouts in the first function while doing this.

Function 1 (index.js)

module.exports = function (context, odsDataFile) {
  context.log('JavaScript blob trigger function processed blob \n Name:', context.bindingData.odaDataFile, '\n Blob Size:', odsDataFile.length, 'Bytes');

  const odsCodes = [];

  odsDataFile.split('\n').map((line) => {
    const columns = line.split(',');

    if (columns[12] === 'A') {
      odsCodes.push({
        'odsCode': columns[0],
        'orgType': 'pharmacy',
      });
    }
  });

  context.bindings.odsCodes = odsCodes;
  context.log(`A total of: ${odsCodes.length} ods codes have been sent to the queue.`);

  context.done();
};

function.json

{
  "bindings": [
    {
      "type": "blobTrigger",
      "name": "odaDataFile",
      "path": "input-ods-data",
      "connection": "connecting-to-services_STORAGE",
      "direction": "in"
    },
    {
      "type": "queue",
      "name": "odsCodes",
      "queueName": "ods-org-codes",
      "connection": "connecting-to-services_STORAGE",
      "direction": "out"
    }
  ],
  "disabled": false
}

Full code here

This function works fine when the number of ID's is in the 100's but times out when it is in the 10's of 1000's. The building of the ID array happens in milliseconds and the function completes but the adding of the items to the queue seems to take many minutes and eventually causes a timeout at the default of 5 mins.

I am surprised that the simple act of populating the queue seems to take such a long time and that the timeout for a function seems to include the time for tasks external to the function (i.e. queue population). Is this to be expected? Are there more performant ways of doing this?

We are running under the Consumption (Dynamic) Plan.

2

There are 2 answers

4
brettsam On

I did some testing of this from my local machine and found that it takes ~200ms to insert a message into the queue, which is expected. So if you have 17k messages to insert and are doing it sequentially, the time will take:

17,000 messages * 200ms = 3,400,000ms or ~56 minutes

The latency may be a bit quicker when running from the cloud, but you can see how this would jump over 5 minutes pretty quickly when you are inserting that many messages.

If message ordering isn't crucial, you could insert the messages in parallel. Some caveats, though:

  1. You can't do this with node -- it'd have to be C#. Node doesn't expose the IAsyncCollector interface to you so it does it all behind-the-scenes.
  2. You can't insert everything in parallel because the Consumption plan has a limit of 250 network connections at a time.

Here's an example of batching up the inserts 200 at a time -- with 17k messages, this took under a minute in my quick test.

public static async Task Run(string myBlob, IAsyncCollector<string> odsCodes, TraceWriter log)
{
    List<Task> tasks = new List<Task>();
    string[] lines = myBlob.Split(Environment.NewLine.ToCharArray(), StringSplitOptions.RemoveEmptyEntries);

    int skip = 0;
    int take = 200;

    IEnumerable<string> batch = lines.Skip(skip).Take(take);

    while (batch.Count() > 0)
    {
        await AddBatch(batch, odsCodes);
        skip += take;
        batch = lines.Skip(skip).Take(take);
    }
}

public static async Task AddBatch(IEnumerable<string> lines, IAsyncCollector<string> odsCodes)
{
    List<Task> tasks = new List<Task>();    
    foreach (string line in lines)
    {
        tasks.Add(odsCodes.AddAsync(line));
    }
    await Task.WhenAll(tasks);
}
0
Paul Batum On

As other answers have pointed out, because Azure Queues does not have a batch API you should consider an alternative such as Service Bus Queues. But if you are sticking with Azure Queues you need to avoid outputting the queue items sequentially, i.e. some form of constrained parallelism is necessary. One way to achieve this is to use the TPL Dataflow library.

One advantage Dataflow has to using batches of tasks and doing a WhenAll(..) is that you will never have a scenario where a batch is almost done and you are waiting for one slow execution to complete before starting the next batch.

I compared inserting 10,000 items with task batches of size 32 and dataflow with parallelism set to 32. The batch approach completed in 60 seconds, while dataflow completed in almost half that (32 seconds).

The code would look something like this:

    using System.Threading.Tasks.Dataflow;
    ...
    var addMessageBlock = new ActionBlock<string>(async message =>
    {
        await odscodes.AddAsync(message);
    }, new ExecutionDataflowBlockOptions { SingleProducerConstrained = true, MaxDegreeOfParallelism = 32});

    var bufferBlock = new BufferBlock<string>();
    bufferBlock.LinkTo(addMessageBlock, new DataflowLinkOptions { PropagateCompletion = true });

    foreach(string line in lines)
        bufferBlock.Post(line);

    bufferBlock.Complete();
    await addMessageBlock.Completion;