How to get reason for failure using slack in airflow2.0

862 views Asked by At

How to get the reason for the failure of an operator, without going into logs. As I want to post the reason as a notification through slack?

2

There are 2 answers

4
hopeIsTheonlyWeapon On

I can think of one way of doing this as below.

Check the above for SlackAPIPostOperator

0
Xi12 On

exception=context.get('exception')is the function which will give exact reason for failure

Example of on_failure_callback using slack:

 step_checker = EmrStepSensor(task_id='watch_step',
                 job_flow_id="{{ task_instance.xcom_pull('create_job_flow', 
                 key='return_value') }}",
        step_id="{{task_instance.xcom_pull(task_ids='add_steps',key='return_value')[0] }}",
        aws_conn_id='aws_default',
        on_failure_callback=task_fail_slack_alert,)    
    

def task_fail_slack_alert(context):
        SLACK_CONN_ID = 'slack'
        slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
        slack_msg = """
                :red_circle: Task Failed. 
                *Task*: {task}  
                *Dag*: {dag} 
                *Execution Time*: {exec_date}  
                *Log Url*: {log_url} 
                *Error*:{exception}
                """.format(
                task=context.get('task_instance').task_id,
                dag=context.get('task_instance').dag_id,
                exec_date=context.get('execution_date'),
                log_url=context.get('task_instance').log_url,
                exception=context.get('exception') 
               
            )
        failed_alert = SlackWebhookOperator(
            task_id='slack_test',
            http_conn_id='slack',
            webhook_token=slack_webhook_token,
            message=slack_msg,
            username='airflow',
            dag=dag)
        return failed_alert.execute(context=context)