I have implemented a task group that is expected to be reused across multiple DAGs, in one of which utilizing it in a mapping manner makes more sense. Here is the full code of my task group:
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.sftp.operators.sftp import SFTPOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook, SSHHook
def DeliveryGroup(group_id: str, file:str, deliver_args:dict, **kwargs) -> TaskGroup:
with TaskGroup(group_id=group_id, **kwargs) as tg:
# select destination type
selector_task = BranchPythonOperator(
task_id='destination_selector',
python_callable=lambda: f"{deliver_args.get('type')}"
)
email_task = EmailOperator(
task_id="email",
to=deliver_args.get('to'),
subject=deliver_args.get('subject'),
cc=deliver_args.get('cc'),
html_content=deliver_args.get('body'),
files=[file]
)
sftp_task = SFTPOperator(
task_id="sftp",
# ssh_conn_id='YinzCam-Connection',
sftp_hook=SFTPHook(
remote_host=deliver_args.get('host'),
username=deliver_args.get('username'),
password=deliver_args.get('password'),
port=deliver_args.get('port', 22)),
local_filepath=[file],
remote_filepath=[deliver_args.get('path')]
)
selector_task >> [email_task, sftp_task]
return tg
What I would do next is to pass a list of dicts that represent separate destinations as the expansive parameter of this task group.
task3 = DeliveryGroup.partial(
group_id='deliver',
file = "my_file.csv",
).expand(
args=dag.default_args.get('destinations') # a list of dicts
)
However, I received this error: AttributeError: 'function' object has no attribute 'partial'
. So what is the correct way to write a mapping over a task group without using a decorator?
A guide of syntax, references
The problem:
So it's something like:
If I understand correctly what you need, here is an example: