I would like to extract all execution time of a particular task in an Airflow Dag. I would prefer to do it by writing another Dag.
I have used the following Dag to extract the status and execution time of another Dag
import pandas as pd
import numpy as np
import pandas_gbq
from google.cloud import storage as gcs
from google.cloud import bigquery
dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=my_dag)
# Declare empty array
arr = []
arr1 = []
for dag_run in dag_runs:
arr.append(dag_run.state)
arr1.append(dag_run.execution_date)
dag_info = {'time': arr1, 'dag_status': arr}
df = pd.DataFrame(dag_info)
## Keep failed and successful dag runs
df_status = df[(df.dag_status == "failed") | (df.dag_status == "success")]
df_status.loc[df_status['dag_status']=='success','flag'] = 0
df_status.loc[df_status['dag_status']=='failed','flag'] = 1
### Code to save the table in Bigquery
return None
I would like to do same but this time to extract a task information for 'my_dag'. I have tried solution given Status of Airflow task within the dag but it returns "None", although I know the task and dag is running.
def task_status_check(**kwargs):
##### TESTING. ####
import pandas as pd
import datetime
my_date = datetime.datetime(2020, 9, 28)
my_dag_id = 'my_dag'
my_task_id = 'my_task'
dag_folder = conf.get('core','DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[my_dag_id]
my_task = check_dag.get_task(my_task_id)
for n in range(1, 500, 2):
time_delta = timedelta(minutes = n)
my_date_1 = my_date + time_delta
ti = TaskInstance(my_task, my_date_1)
print("######################")
print(ti.current_state())
Any help will be highly appreciated.
Thank you
I suspect the issue here in TaskInstance() model but not the custom code logic enclosed in
task_status_check()
function. BasicallyTaskInstance()
class offers a variety of Airflow tasks managing features leveraging SQLAlchemy OMR Python tool which performs the query against entire Airflow metadata DB fetching the records fromtask_instance
SQL table, looking through the source code you might get #L203 that reflects this.I've tried your code in the very common similar scenario and faced with the same
None
returned state. Reviewing the user's efforts mentioned in the initial question Stack thread and getting deeper into the problem I've adjustedget_task_instance()
to check the behavior, pointing this function to extract the state for the particular Airflow task. As long asget_task_instance()
is an experimental package, seemingly it is invokingTaskInstance()
class to discover the task state:I've checked that the request to Airflow DB was successful, however
get_task_instance
function returns the sameNone
state:Meanwhile, doing the further research, considering the other methods for extracting the state for Airflow tasks and they just kept this job fine.
Airflow command-line executor, adjusted to run on one of the Composer workers:
Querying the metadata MySQL
task_instance
accordingly: