TypeError: 'XComArg' object is not iterable on task.group and DAG level but not on external tasks?

151 views Asked by At

I'm trying to use external tasks to run on my venv and I was able to do 2 out of 3, with success. The problem seems to be passing params dict for the task.group I've created, which is always throwing the non iterable error, even if I try to iterate through the params dict I've returned with TaskFlowApi on task1. TypeError: 'XComArg' object is not iterable

happens on the for loop trying to iterate on task.group level, the params dict But, strangely or not, it works if I try to do it on task gdp_task with same returned dict (d_params from task1). I don't get it: I would hope for same behavior, but not this. Any insights would be appreciated. I don't mind not grouping, but even doing the same for loop within DAG, throws the same exact TypeError. What am I missing here?

Here's my code:

"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations
import logging
import sys
import os
import platform
import json
import time
from pprint import pprint
from datetime import datetime  # Added import statement
import pendulum
#from airflow import DAG
from airflow.decorators import dag, task, task_group
from airflow.models.param import ParamsDict
#from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context

# Define your commands and other configurations
script_directory = os.path.dirname(os.path.abspath(__file__))
config_file_path = os.path.join(script_directory, "dags_config.json")
# Load JSON from the file to centralize paths or other params
with open(config_file_path, "r") as file:
    config = json.load(file)

PATH_TO_PYTHON_BINARY = config["venv_bin"]

logger = logging.getLogger(__name__)

@task(task_id="trying_params")
def get_data(params=None):
    print(f"params ({type(params)}) = {params}")
    return params

#not used for now
@task(task_id="get_context")
def get_context_params(params=None) -> dict:
    """Print the Airflow context and ds variable from the context."""
    # pprint(f"context= {context}")
    # params = context.get('params', {})
    context = get_current_context()
    params = context["params"]
    # gdp_args = params.get('GDP_ARGS', {})
    print(f"params ({type(params)}) = {params}")
    print("TASK0 - Contents of sys.path:", sys.path)
    return params


@task.external_python(task_id="ext_s3", python=PATH_TO_PYTHON_BINARY)
def s3_extract(dag_params:None, config, args_dict):  # Accept gdp_src as an argument

    sys.path.insert(0, config["oracle_src"])
    # sys.path.insert(0, config["ORACLE_LIB"])

    # Print sys.path to verify the directories included
    print("S3 Contents of sys.path:", sys.path)
    print("S3 Received arguments:", dag_params)
    import os
    import platform
    # Execute the external task with arguments
    import airflow_server_oracle_extractor as s3_upload
    # args_dict = {
    # '-d': dag_params['SOURCE_DB'],
    # '-s': dag_params['SOURCE_OBJ'][0],
    # '-t': dag_params['TARGET_OBJ'][0],
    # '-n': dag_params['NR_FILES'],
    # '-p': dag_params['AWS_PROFILE']
    # }
    print(args_dict)
    s3_upload.main(args_dict)
    return dag_params


@task.external_python(task_id="external_gdp", python=PATH_TO_PYTHON_BINARY)
def execute_gdp(args: dict = None, config: str = None):  # Accept gdp_src as an argument
    """
    Function to execute the GDP script with the specified parameters.
    Args:
        args: dict representing passed args as they were parsed by docopt.
        config: Config to get Path to gdp src dir
    """
    # Path to the GDP script
    sys.path.insert(0, config["src_dir"])

    # Print sys.path to verify the directories included
    print("Contents of sys.path:", sys.path)
    print(f"params ({type(args)}) = {args}")
    #print(f"Args vs Params :\n args={args} \n params({type(params)})= {params}")
    for (source_obj, target_obj) in zip(args['SOURCE_OBJ'], args['TARGET_OBJ']):
        print(f"src= {source_obj}, tgt= {target_obj}")
    
    # Execute the external task with arguments
    #import gdp_main as gdp
    #gdp.main(args['GDP_ARGS'])
    
    return args

@task_group(group_id='s3_group', tooltip="Tasks for s3 extraction for USI core tables")
def s3_extract_group(dag_params: dict = {}, config: str = None):
    print(f"dag_params({type(dag_params)})={dag_params}; config={config}")
    s3_tasks = []
    """This is not a bug. It is not possible to unpack an Xcom in parsing time because it does not exist at that moment. 
    Xcom is created at runtime after the first task is executed:"""
    for (source_obj, target_obj) in zip(dag_params['SOURCE_OBJ'], dag_params['TARGET_OBJ']):
        ext_task_id = f"s3_{target_obj}"  # Dynamically generate task_id
        args_dict = {
            '-d': dag_params['SOURCE_DB'],
            '-s': source_obj,
            '-t': target_obj,
            '-n': dag_params['NR_FILES'],
            '-p': dag_params['AWS_PROFILE']
        }
        s3_tasks.append(s3_extract(task_id=ext_task_id)(dag_params, config, args_dict))
    return s3_tasks    


@dag(
    dag_id="gdp_bus_trial",
    description='Trial with external operator+venv + passing context or context issue',
    default_args={

    },
    params={
        'SOURCE_DB': 'USI',
        'SOURCE_OBJ': ['table1','table2'],
        'TARGET_OBJ': ['tablex1','tablex2'],
        'GDP_ARGS': {'--t': None, '--d': None, '--g': '1'},
        'NR_FILES': 10,
        'AWS_PROFILE': 'default'
    },
    schedule=None,
    catchup=False,
    render_template_as_native_obj=True
)
def core_dag():
    d_params = get_data()
    print(f"d_params ({type(d_params)})= {d_params}")
    #d_params=json.loads(get_params) 

    #1) this throws not iterable xcomArg on dag level or task_group level only:
    s3_task_group = s3_extract_group(dag_params=d_params, config=config)
    gdp_task = execute_gdp(args=d_params, config=config)
    d_params >> s3_task_group >> gdp_task

    #2) but this works fine just passing the params dict for gdp_task and iterating
    #d_params >> gdp_task
    
# Instantiate the DAG
core_dag()
0

There are 0 answers