I am trying to execute a Airflow script that consists of a couple of functions. I want to pass the value of 'program_no' as an argument in spark submit request which I am getting in my DAG from an api call via context in get_conf method. I am trying to pass like {ti.xcom_pull(task_ids='parameterized_task')} but I am getting an error - NameError: name 'ti' is not defined. Please help how to resolve this issue?
I have also tried to pass {prog_no} instead of {ti.xcom_pull(task_ids='parameterized_task')} but getting same error - prog_no not defined
dag = DAG(
dag_id=os.path.basename(__file__).replace(".py", ""),
default_args=default_args,
start_date=datetime(2023, 12, 21),
schedule_interval=None,
description='Event based job for calculating missed sales based allowances for retroactive program setup'
)
def get_conf(**context):
global prog_no
#ti = context['ti']
prog_no = context['dag_run'].conf['program_no']
return prog_no
parameterized_task = PythonOperator(
task_id="parameterized_task",
python_callable=get_conf,
provide_context=True,
dag=dag
)
sparkway_request = SimpleHttpOperator(
task_id='sparkway_request',
endpoint=Variable.get('sparkway_api_endpoint'),
method="POST",
http_conn_id="SPARKWAY_CONN",
data=json.dumps({
"cmd": "sparkway-submit --master kubernetes --job-name spark-allowance-calculation --class com.xxx.CalculationApplication --spark-app s3a://xyz.jar --arguments SuspendedProgramStatus --num-executors 2 --executor-cores 2 --executor-memory 3G --driver-memory 3G",
"arguments": f"{dag.latest_execution_date},{ti.xcom_pull(task_ids='parameterized_task')}",
"type": "job"
}),
headers={
"Authorization": f"Bearer {Variable.get('sparkway_token')}",
"Content-Type": "application/json",
"X-CSRF-TOKEN": Variable.get('sparkway_csrf_token')
},
response_check=lambda response: handle_sparkway_response(response),
log_response=True,
dag=dag
)
parameterized_task >> sparkway_request
this way accessing ti inside the jinja template should work: