How to access Xcom value in a non airflow operator python function

4.8k views Asked by At

I have a stored XCom value that I wanted to pass to another python function which is not called using PythonOperator.


def sql_file_template():
    <some code which uses xcom variable>

def call_stored_proc(**kwargs):
        
        #project = kwargs['row_id']
        print("INSIDE CALL STORE PROC ------------")   
        query = """CALL `{0}.dataset_name.store_proc`(
                          '{1}' # source table
                        , ['{2}'] # row_ids
                        , '{3}' # pivot_col_name   
                        , '{4}' # pivot_col_value
                        ,  100 # max_columns
                        , 'MAX' # aggregation
                );"""
        query = query.format(kwargs['project'],kwargs['source_tbl'] ,kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
        job = client.query(query, location="US")
        for result in job.result():
            task_instance = kwargs['task_instance']
            task_instance.xcom_push(key='query_string', value=result) 
                print result
                return result



bq_cmd = PythonOperator (
    task_id=                    'task1'
    provide_context=            True,
    python_callable=            call_stored_proc,
    op_kwargs=                  {'project'        : project,
                                 'source_tbl'     : source_tbl,
                                 'row_id'         : row_id,
                                 'pivot_col'      : pivot_col,
                                 'pivot_val'      : pivot_val
                                },
    dag=                        dag
)

dummy_operator >> bq_cmd
sql_file_template()

The output of stored proc is a string which is captured using xcom.

Now I would like to pass this value to some python function sql_file_template without using PythonOperator.

As per Airflow documentation xcom can be accessed only between tasks.

Can anyone help on this?

2

There are 2 answers

3
y2k-shubham On

So you want to access XCOM outside Airflow (probably a different project / module, without creating any Airflow DAGs / tasks)?


Airflow uses SQLAlchemy for mapping all it's models (including XCOM) to corresponding SQLAlchemy backend (meta-db) tables

Therefore this can be done in two ways

  1. Leverage Airflow's SQLAlchemy model

    (without having to create a task or DAG). Here's an untested code snippet for reference

from typing import List
from airflow.models import XCom
from airflow.settings import Session
from airflow.utils.db import provide_session
from pendulum import Pendulum


@provide_session
def read_xcom_values(dag_id: str,
                     task_id: str,
                     execution_date: Pendulum,
                     session: Optional[Session]) -> List[str]:
    """
    Function that reads and returns 'values' of XCOMs with given filters
    :param dag_id: 
    :param task_id: 
    :param execution_date: datetime object
    :param session: Airflow's SQLAlchemy Session (this param must not be passed, it will be automatically supplied by
                    '@provide_session' decorator)
    :return: 
    """
    # read XCOMs
    xcoms: List[XCom] = session.query(XCom).filter(
        XCom.dag_id == dag_id, XCom.task_id == task_id,
        XCom.execution_date == execution_date).all()
    # retrive 'value' fields from XCOMs
    xcom_values: List[str] = list(map(lambda xcom: xcom.value, xcoms))
    return xcom_values

Do note that since it is importing airflow packages, it still requires working airflow installation on python classpath (as well as connection to backend-db), but here we are not creating any tasks or dags (this snippet can be run in a standalone python file)

For this snippet, I have referred to views.py which is my favorite place to peek into Airflow's SQLAlchemy magic


  1. Directly query Airflow's SQLAlchemy backend meta-db

    Connect to meta db and run this query

    SELECT value FROM xcom WHERE dag_id='' AND task_id='' AND ..

0
joebeeson On

If you have access to the Airflow installation you'd like to query (configuration, database access, and code) you can use Airflow's airflow.models.XCom:get_one class method:

from datetime import datetime

from airflow.models import XCom


execution_date = datetime(2020, 8, 28)
xcom_value = XCom.get_one(execution_date=execution_date,
                          task_id="the_task_id",
                          dag_id="the_dag_id")