I need to create a batch that can process tasks with this workflow:
| task 4
| task 3 ->| task 4
| task 2 -> | task 4
| task 3 ->| task 4
input -> task 1 ->
| task 2 -> ...
- Task #1 processes input data and returns a list of lists.
- Task #2 receives the list from task #1 and also returns a list of lists.
- Task #3 receives the list from task #2 and also returns a list of lists.
- Task #4 receives the list from task #4 and processes data in the list.
For example, task #1 returns [[],[],[],[]]
. It means flow must run 4 task #2 in parallel. Each task #2 returns [[],[],[]]
. Now we must have 4x3 task #3. Then task #3 returns [[],[]]
. Finally flow must run 4x3x2 tasks #4.
Is it possible to do using Prefect Flow? I tried to use mapping functionality but it seems does not support such complicated workflow schema (or perhaps I do not use it properly).
with Flow('test') as flow:
res1 = task1()
res2 = task2.map(res1)
res3 = task3.map(res2)
res4 = task4.map(res3)
When I run the flow task1 returns the correct number of lists. Then flow creates 4 task2 and each of them returns the list of three lists. But instead of creating 12 task3, the flow creates only 4 of them. Each task3 receives the list of 4 lists as it was created with task1 instead of 1 list from task2.
Any ideas about how can I create such a workflow?