Execution time and state of a specific task within a airflow dag

2.5k views Asked by At

I would like to extract all execution time of a particular task in an Airflow Dag. I would prefer to do it by writing another Dag.

I have used the following Dag to extract the status and execution time of another Dag


import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import storage as gcs
from google.cloud import bigquery

dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=my_dag)
        
# Declare empty array

arr = []
arr1 = []

for dag_run in dag_runs:
    arr.append(dag_run.state)
    arr1.append(dag_run.execution_date)
 

dag_info = {'time': arr1, 'dag_status': arr}

df = pd.DataFrame(dag_info)

## Keep failed and successful dag runs    
df_status = df[(df.dag_status == "failed") | (df.dag_status == "success")] 

df_status.loc[df_status['dag_status']=='success','flag'] = 0
df_status.loc[df_status['dag_status']=='failed','flag'] = 1

### Code to save the table in Bigquery


return None

I would like to do same but this time to extract a task information for 'my_dag'. I have tried solution given Status of Airflow task within the dag but it returns "None", although I know the task and dag is running.

def task_status_check(**kwargs):

        ##### TESTING. ####

        import pandas as pd
        import datetime

        my_date = datetime.datetime(2020, 9, 28)

        my_dag_id = 'my_dag'
        my_task_id = 'my_task'


        dag_folder = conf.get('core','DAGS_FOLDER')
        dagbag = DagBag(dag_folder)
        check_dag = dagbag.dags[my_dag_id]
        my_task = check_dag.get_task(my_task_id)

        for n in range(1, 500, 2):

            time_delta = timedelta(minutes = n)
            my_date_1 = my_date + time_delta 
            ti = TaskInstance(my_task, my_date_1)

            print("######################")
            print(ti.current_state())

Any help will be highly appreciated.

Thank you

1

There are 1 answers

3
Nick_Kh On

I suspect the issue here in TaskInstance() model but not the custom code logic enclosed in task_status_check() function. Basically TaskInstance() class offers a variety of Airflow tasks managing features leveraging SQLAlchemy OMR Python tool which performs the query against entire Airflow metadata DB fetching the records from task_instance SQL table, looking through the source code you might get #L203 that reflects this.

I've tried your code in the very common similar scenario and faced with the same None returned state. Reviewing the user's efforts mentioned in the initial question Stack thread and getting deeper into the problem I've adjusted get_task_instance() to check the behavior, pointing this function to extract the state for the particular Airflow task. As long as get_task_instance() is an experimental package, seemingly it is invoking TaskInstance() class to discover the task state:

def task_check(**kwargs):
  import datetime
  from datetime import timezone
  from airflow import configuration as conf
  import logging
  from airflow.api.common.experimental.get_task_instance import get_task_instance
  
  my_date = datetime.datetime('yyyy', 'mm', 'dd', 'hour', 'min', 'sec')
  my_date = my_date.replace(tzinfo=timezone.utc) 

  my_dag_id = "Dag_id"
  my_task_id = "Task_id"
  ti = get_task_instance(my_dag_id,my_task_id,my_date)

I've checked that the request to Airflow DB was successful, however get_task_instance function returns the same None state:

{python_operator.py:114} INFO - Done. Returned value was: None

Meanwhile, doing the further research, considering the other methods for extracting the state for Airflow tasks and they just kept this job fine.

  • Airflow command-line executor, adjusted to run on one of the Composer workers:

    kubectl -it exec $(kubectl get po -l run=airflow-worker -o jsonpath='{.items[0].metadata.name}' \
        -n $(kubectl get ns| grep composer*| awk '{print $1}')) -n $(kubectl get ns| grep composer*| awk '{print $1}') \
        -c airflow-worker airflow task_state <Dag_ID> <Task_ID> 2020-09-27T23:59:21+00:00
    
  • Querying the metadata MySQL task_instance accordingly:

   SELECT task_id, state, execution_date
   FROM task_instance
   WHERE dag_id = 'dag_id'
   AND DATE(execution_date) = 'execution_date'
   AND task_id = 'task_id'