Extending Airflow DAG class - is this a bad practice?

47 views Asked by At

I didn't see any examples of this so I am wondering if this is a bad practice to extend DAG class. Is it a bad practice and why, if it is?

Example of where I can see this useful follows...

Let's say we have a number of DAGs which all share the same behaviour: calling a specific function as a very last thing, regardless of success or failure. This function could be something like invoking some external API, for instance.

My idea to approach this would be something along these lines:

  • extend the DAG class creating a new class DAGWithFinishAction
  • implement on_success_callback and on_failure_callback in DAGWithFinishAction to do what I wanted to achieve
  • use the new class in with DAGWithFinishAction(dag_id=..., ...) as dag: ...
  • schedule tasks in each of implementing DAGs
  • expect that each of those DAGs call it's success/failure callbacks after all tasks are finished (in any state)

Is there anything wrong with this approach? I couldn't find anything similar which makes me believe I am missing something.

class DAGWithFinishAction(DAG):

    def __init__(self, dag_id, **kwargs):
        self.metric_callback = publish_execution_time

        on_success_callback = kwargs.get("on_success_callback")
        if on_success_callback is None:
            on_success_callback = self.metric_callback
        else:
            if isinstance(on_success_callback, list):
                on_success_callback.append(self.metric_callback)
            else:
                on_success_callback = [on_success_callback, self.metric_callback]

        kwargs["on_success_callback"] = on_success_callback
        super().__init__(dag_id, **kwargs)

with DAGWithFinishAction(dag_id=..., ...) as dag:
    ...

The code above works but I am still not sure if this is something that should be avoided or is it a legitimate approach when designing DAGs.

0

There are 0 answers