I'm trying to use external tasks to run on my venv and I was able to do 2 out of 3, with success. The problem seems to be passing params dict for the task.group I've created, which is always throwing the non iterable error, even if I try to iterate through the params dict I've returned with TaskFlowApi on task1. TypeError: 'XComArg' object is not iterable
happens on the for loop trying to iterate on task.group level, the params dict But, strangely or not, it works if I try to do it on task gdp_task with same returned dict (d_params from task1). I don't get it: I would hope for same behavior, but not this. Any insights would be appreciated. I don't mind not grouping, but even doing the same for loop within DAG, throws the same exact TypeError. What am I missing here?
Here's my code:
"""
Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a
virtual environment.
"""
from __future__ import annotations
import logging
import sys
import os
import platform
import json
import time
from pprint import pprint
from datetime import datetime # Added import statement
import pendulum
#from airflow import DAG
from airflow.decorators import dag, task, task_group
from airflow.models.param import ParamsDict
#from airflow.operators.empty import EmptyOperator
from airflow.operators.python import get_current_context
# Define your commands and other configurations
script_directory = os.path.dirname(os.path.abspath(__file__))
config_file_path = os.path.join(script_directory, "dags_config.json")
# Load JSON from the file to centralize paths or other params
with open(config_file_path, "r") as file:
config = json.load(file)
PATH_TO_PYTHON_BINARY = config["venv_bin"]
logger = logging.getLogger(__name__)
@task(task_id="trying_params")
def get_data(params=None):
print(f"params ({type(params)}) = {params}")
return params
#not used for now
@task(task_id="get_context")
def get_context_params(params=None) -> dict:
"""Print the Airflow context and ds variable from the context."""
# pprint(f"context= {context}")
# params = context.get('params', {})
context = get_current_context()
params = context["params"]
# gdp_args = params.get('GDP_ARGS', {})
print(f"params ({type(params)}) = {params}")
print("TASK0 - Contents of sys.path:", sys.path)
return params
@task.external_python(task_id="ext_s3", python=PATH_TO_PYTHON_BINARY)
def s3_extract(dag_params:None, config, args_dict): # Accept gdp_src as an argument
sys.path.insert(0, config["oracle_src"])
# sys.path.insert(0, config["ORACLE_LIB"])
# Print sys.path to verify the directories included
print("S3 Contents of sys.path:", sys.path)
print("S3 Received arguments:", dag_params)
import os
import platform
# Execute the external task with arguments
import airflow_server_oracle_extractor as s3_upload
# args_dict = {
# '-d': dag_params['SOURCE_DB'],
# '-s': dag_params['SOURCE_OBJ'][0],
# '-t': dag_params['TARGET_OBJ'][0],
# '-n': dag_params['NR_FILES'],
# '-p': dag_params['AWS_PROFILE']
# }
print(args_dict)
s3_upload.main(args_dict)
return dag_params
@task.external_python(task_id="external_gdp", python=PATH_TO_PYTHON_BINARY)
def execute_gdp(args: dict = None, config: str = None): # Accept gdp_src as an argument
"""
Function to execute the GDP script with the specified parameters.
Args:
args: dict representing passed args as they were parsed by docopt.
config: Config to get Path to gdp src dir
"""
# Path to the GDP script
sys.path.insert(0, config["src_dir"])
# Print sys.path to verify the directories included
print("Contents of sys.path:", sys.path)
print(f"params ({type(args)}) = {args}")
#print(f"Args vs Params :\n args={args} \n params({type(params)})= {params}")
for (source_obj, target_obj) in zip(args['SOURCE_OBJ'], args['TARGET_OBJ']):
print(f"src= {source_obj}, tgt= {target_obj}")
# Execute the external task with arguments
#import gdp_main as gdp
#gdp.main(args['GDP_ARGS'])
return args
@task_group(group_id='s3_group', tooltip="Tasks for s3 extraction for USI core tables")
def s3_extract_group(dag_params: dict = {}, config: str = None):
print(f"dag_params({type(dag_params)})={dag_params}; config={config}")
s3_tasks = []
"""This is not a bug. It is not possible to unpack an Xcom in parsing time because it does not exist at that moment.
Xcom is created at runtime after the first task is executed:"""
for (source_obj, target_obj) in zip(dag_params['SOURCE_OBJ'], dag_params['TARGET_OBJ']):
ext_task_id = f"s3_{target_obj}" # Dynamically generate task_id
args_dict = {
'-d': dag_params['SOURCE_DB'],
'-s': source_obj,
'-t': target_obj,
'-n': dag_params['NR_FILES'],
'-p': dag_params['AWS_PROFILE']
}
s3_tasks.append(s3_extract(task_id=ext_task_id)(dag_params, config, args_dict))
return s3_tasks
@dag(
dag_id="gdp_bus_trial",
description='Trial with external operator+venv + passing context or context issue',
default_args={
},
params={
'SOURCE_DB': 'USI',
'SOURCE_OBJ': ['table1','table2'],
'TARGET_OBJ': ['tablex1','tablex2'],
'GDP_ARGS': {'--t': None, '--d': None, '--g': '1'},
'NR_FILES': 10,
'AWS_PROFILE': 'default'
},
schedule=None,
catchup=False,
render_template_as_native_obj=True
)
def core_dag():
d_params = get_data()
print(f"d_params ({type(d_params)})= {d_params}")
#d_params=json.loads(get_params)
#1) this throws not iterable xcomArg on dag level or task_group level only:
s3_task_group = s3_extract_group(dag_params=d_params, config=config)
gdp_task = execute_gdp(args=d_params, config=config)
d_params >> s3_task_group >> gdp_task
#2) but this works fine just passing the params dict for gdp_task and iterating
#d_params >> gdp_task
# Instantiate the DAG
core_dag()