Azure durable function : processing a list

86 views Asked by At

I have an azure durable function wrote in python with an orchestrator and two activity functions

Orchestrator calls the first activity function and in return receive a list variable (List of names and this list can be dynamic everytime function gets executed)

The next step would be to call the second activity function for each one of those list items (sequential processing - due to API restriction that the second activity function calls)

#dynamically gets generated by the first activity function
payload=[1,2,3,4]            

tasks = [context.call_activity("secondfunction",ps) for ps in payload]
output = yield context.task_all(tasks)

what I am using in fan out method which is not serial but I don’t seem to be able find an alternative for what I am trying to do.

Also in the host.json file I tried to enforce that only one activity function can run at a given time to avoid the parallel processing

  "extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 1,
      "maxConcurrentOrchestratorFunctions": 1
    }
  }

also it's worth noting that I can not pass the whole list to the activity function as if I do the activity function will take more than 5-10 mins which is the timeout limit for the azure function hence trying to iterate the list in orchestration function

But the result is not sequential

Would appreciate your feedback

1

There are 1 answers

0
SiddheshDesai On BEST ANSWER

You can try using below two approaches to achieve your requirement:-

Approach 1:-

My function_app.py:-

import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# HTTP Starter
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name, None)  # Pass the functionName here
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    cities = ["Seattle", "Tokyo", "London"]

    tasks = []
    for city in cities:
        tasks.append(context.call_activity("hello", city))

    # Wait for all tasks to complete
    results = yield context.task_all(tasks)

    return results

# Activity
@myApp.activity_trigger(input_name="city")
def hello(city: str):
    print(f"Processing {city}...")
    # Your activity function logic goes here
    result = f"Hello {city}!"

    return result

Output:-

enter image description here

Function URL:-

http://localhost:7071/api/orchestrators/hello_orchestrator

Approach 2:-

function_app.py:-

import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# HTTP Starter
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name, None)  # Pass the functionName here
    response = client.create_check_status_response(req, instance_id)
    return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    # Call the first activity to get a list of names
    names_list = yield context.call_activity("get_names")

    # Process each name sequentially using the second activity
    results = []
    for name in names_list:
        result = yield context.call_activity("process_name", name)
        results.append(result)

    return results

# First Activity
@myApp.activity_trigger
def get_names():
    # Your logic to retrieve a dynamic list of names goes here
    # For demonstration purposes, returning a hardcoded list
    return ["John", "Alice", "Bob"]

# Second Activity
@myApp.activity_trigger(input_name="name")
def process_name(name: str):
    print(f"Processing {name}...")
    # Your logic to process each name goes here
    result = f"Hello {name}!"

    return result