Airflow - NameError: name 'ti' is not defined

103 views Asked by At

I am trying to execute a Airflow script that consists of a couple of functions. I want to pass the value of 'program_no' as an argument in spark submit request which I am getting in my DAG from an api call via context in get_conf method. I am trying to pass like {ti.xcom_pull(task_ids='parameterized_task')} but I am getting an error - NameError: name 'ti' is not defined. Please help how to resolve this issue?
I have also tried to pass {prog_no} instead of {ti.xcom_pull(task_ids='parameterized_task')} but getting same error - prog_no not defined

dag = DAG(
    dag_id=os.path.basename(__file__).replace(".py", ""),
    default_args=default_args,
    start_date=datetime(2023, 12, 21),
    schedule_interval=None,
    description='Event based job for calculating missed sales based allowances for retroactive program setup'
)

def get_conf(**context):
    global prog_no
    #ti = context['ti']
    prog_no = context['dag_run'].conf['program_no']
    return prog_no

parameterized_task = PythonOperator(
        task_id="parameterized_task",
        python_callable=get_conf,
        provide_context=True,
        dag=dag
    )    

sparkway_request = SimpleHttpOperator(
        task_id='sparkway_request',
        endpoint=Variable.get('sparkway_api_endpoint'),
        method="POST",
        http_conn_id="SPARKWAY_CONN",
        data=json.dumps({
            "cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
            "arguments": f"{dag.latest_execution_date},{ti.xcom_pull(task_ids='parameterized_task')}",
            "type": "job"
        }),
        headers={
            "Authorization": f"Bearer {Variable.get('sparkway_token')}",
            "Content-Type": "application/json",
            "X-CSRF-TOKEN": Variable.get('sparkway_csrf_token')
        },
        response_check=lambda response: handle_sparkway_response(response),
        log_response=True,
        dag=dag
    )

parameterized_task >> sparkway_request

2

There are 2 answers

0
subram On

this way accessing ti inside the jinja template should work:

        data=json.dumps({
            "cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
            "arguments": f"{dag.latest_execution_date}, {{ ti.xcom_pull(task_ids=["parameterized_task"][0]) }}",
            "type": "job"
        }),

0
ghowkay On

ti will not be defined as it can only be accessible that way using jinja templating. I'm not sure if SimpleHttpOperator support this but you can try this instead

    data=json.dumps({
        "cmd": "sparkway-submit ........",
        "arguments": "{{ execution_date }},{{ task_instance.xcom_pull(task_ids='parameterized_task', key='program_no') }}",
        "type": "job"
    })