how to pass parameters from pythonoperator task to simplehttpoperator task in airflow dag?

7.4k views Asked by At

i want to trigger a simplehttpoperator,like this: airflow trigger_dag test_trigger --conf '{"name":"something"}'

then i use a pythonoperator python_callable to accept parameters by using kwargs['dag_run'].conf , and i want to pass the ['dag_run'].conf to simplehttpoperator, how can i do it? anyone can help?

cc_ = {}


def run_this_func(ds, **kwargs):
    cc_ = kwargs['dag_run'].conf
    logging.info(cc_)
    return cc_

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)

http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='test_http',
    method='POST',
    endpoint='/api/v1/function',
    data=cc_,
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
    response_check=lambda response: True if "10000" in response.content else False,
    dag=dag)

http_task.set_upstream(run_this)
3

There are 3 answers

3
Chengzhi On

For the communication between tasks, you might want to check the XCOM, https://airflow.incubator.apache.org/concepts.html#xcoms

*****UPDATE*****
(thanks Daniel for more detail) Below is some codes you can give it a try, in your SimpleHttpOperator you get the return value via XCOM:

http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='test_http',
    method='POST',
    endpoint='/api/v1/function',
    data=json.loads("{{ task_instance.xcom_pull(task_ids='run_this', key='return_value') }}"),
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
    response_check=lambda response: True if "10000" in response.content else False,
    dag=dag)
0
pyfroggogogo On

Thanks to @Chengzhi and @Daniel. At last I wrote a custom filter 'tojson' in Jinja2/filter.py,because in airflow the default Jinja2 version is 2.8.1 and Jinja2 does not contain the builtin filter named 'tojson' until version 2.9 .

def do_tojson(value):
    value = json.JSONEncoder().encode(value)
    return value

In dag file, the code as follows. It works.

def run_this_func(ds, **kwargs):
    cc_ = kwargs['dag_run'].conf
    return cc_

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)

http_task = SimpleHttpOperator(
    task_id='http_task',
    http_conn_id='test_http',
    method='POST',
    endpoint='/api/v1/task',
    data="{{ task_instance.xcom_pull(task_ids='run_this') |tojson}}",
    headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*",
             "Content-Type": "application/json"},
    response_check=lambda response: True if "10000" in response.content else False,
    dag=dag)

http_task.set_upstream(run_this)
0
Vinit Bodhwani On

You dont need a separate PythonOperator to collect data. I created a custom operator which can accept data (passed from an API) and pass it to send a GET?POST request. Here is the link: https://stackoverflow.com/a/69443084/8081381