Queue large number of tasks from Google cloud function

1.8k views Asked by At

I'm trying to use cloud functions to update data by calling an external API once a day.

So far I have:

  • Cloud Schedule set to invoke Function 1

  • Function 1 - loop over items and create a task for each item

  • Task - invoke Function 2 with data provided by function 1

  • Function 2 - call external API to get data and update our db

The issue is that there are ~2k items to update daily and a cloud function times out before it can do that, hence why I put them in a queue. But even placing the items in the queue takes too long for the cloud function so that is timing out before it can add them all.

Is there a simple way to bulk add multiple tasks to a queue at once?

Failing that, a better solution to all of this?

All written in python

Code for function 1:

def refresh(request):
    for i in items:
        # Create a client.
        client = tasks_v2.CloudTasksClient()

        # TODO(developer): Uncomment these lines and replace with your values.
        project = 'my-project'
        queue = 'refresh-queue'
        location = 'europe-west2'
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project, location, queue)

        # Construct the request body.
        task = {
            "http_request": {  # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,
                "url": url,  # The full url path that the task will be sent to.
            }
        }
        

        # Use the client to build and send the task.
        response = client.create_task(request={"parent": parent, "task": task})
2

There are 2 answers

0
Chris32 On

Answering your question “Is there a simple way to bulk add multiple tasks to a queue at once?” As per the public documentation The best approach is to implement a double-injection pattern.

For this you will have a new queue where you are going to add a single task that contains multiple tasks of the original queue, then on the receiving end of this queue, you will have a service that will get the data of this task and create one task per entry on a second queue.

Additionally, I will suggest you use the 500/50/5 pattern to a cold queue. This will help both the task queue and the Cloud Function service to ramp up on a safe ratio.

0
Alex On

Chris32's answer is correct, but one thing i noticed in your code snippet is you should create the client outside the for loop.

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()

    # TODO(developer): Uncomment these lines and replace with your values.
    project = 'my-project'
    queue = 'refresh-queue'
    location = 'europe-west2'
    for i in items:
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name={name}"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project, location, queue)

        # Construct the request body.
        task = {
            "http_request": {  # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,
                "url": url,  # The full url path that the task will be sent to.
            }
        }
        

        # Use the client to build and send the task.
        response = client.create_task(request={"parent": parent, "task": task})

In app engine i would do client = tasks_v2.CloudTasksClient() outside of def refresh, at the file level, but i dont know if that matters for cloud functions.

Second thing,

Modify "Function 2" to take multiple 'names' instead of just one. Then in "Function 1" you can send 10 names to "Function 2" at a time

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()
    # ...
    for i in range(0, len(items), BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?names={names}"

        # Construct the fully qualified queue name.
        # ...

If those 2 quick-fixes don't do it, then you'll have to split "Function 1" into Function 1A" and "Function 1B"

Function 1A:

BATCH_SIZE = 100  # send 100 names to Function 1B

def refresh(request):
    client = tasks_v2.CloudTasksClient()
    for i in range(0, len(items), BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-1b?names={names}"

        # send the task.
        response = client.create_task(request={
            "parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
            "task": {
                "http_request": {"http_method": tasks_v2.HttpMethod.GET, "url": url}
        }})

Function 1B:

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # set `names` equal to the query param `names`
    client = tasks_v2.CloudTasksClient()
    for i in range(0, len(names), BATCH_SIZE)]:
        names_batch = ','.join(names[i:i + BATCH_SIZE])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-2?names={names_batch}"

        # send the task.
        response = client.create_task(request={
            "parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
            "task": {
                "http_request": {"http_method": tasks_v2.HttpMethod.GET, "url": url}
        }})