accessing Xcom value inside taskgroup astronomer

36 views Asked by At

I am trying to create file on 1st of every month. The taskgroup should check if todays date is 1 then run (copy_to_s3 >> download_file >> sftp_upload_file) tasks if not run dummy task

  1. this aod should be retrieved from get_run_param task defined above.

  2. I have commented out errors on each modification of code.

aod = datetime.strptime(ds, "%Y-%m-%d") pulls date from taskinstance {{ds}}

    @dag(
        "Out_File",
        start_date=pendulum.datetime(2023, 11, 1, tz='US/Eastern'),
        catchup=False,
        schedule = [ds_DQ1_DAG, ds_DQ2_DAG],
        #schedule_interval = '30 5 * * *', # DAG Time Zone set to EST 
        dagrun_timeout = timedelta(hours=4),
        doc_md=__doc__,
    
        default_args={
            "retries": 2,
            "retry_delay": timedelta(minutes=1),
            "execution_timeout": timedelta(hours=3),
            "on_failure_callback": ms_teams_callback_functions.failure_callback
        })
        
        
        @task(multiple_outputs=True)
        def get_run_param(query_name,ds=None,ti=None) -> dict:

        sf_hook = SnowflakeHook(snowflake_conn_id=snowflake)
        query_name=query_name
        aod = datetime.strptime(ds, "%Y-%m-%d") 
        current_date=aod.strftime("%Y%m%d")#today's date
        previous_date=aod - timedelta(days=1)
        previous_date_str = previous_date.strftime("%Y%m%d")#to point yesterday's date.
        year=aod.strftime("%Y")
        month=aod.strftime("%m")
        day=aod.strftime("%d")
        target_file = f"{query_name}_{current_date}.csv"
                # Push only 'asOfDate' to XCom
        
        #ti.xcom_push(key='as_Of_Date', value=aod)

        return {
               
                "asOfDate": aod,            # Asofdate
                "file_name":target_file,    # FileName
                "download_location": download_location, # Download Location from s3 to local
                "Sftp_loc": SFTP_Directory_Path ,# SFTP Location to Download to
                "query_name":query_name,
                "sql_file_path":sql_file_path,# Query to run/generate file
                "previous_date_str":previous_date_str,
                "current_date":current_date,
                "year":year,
                "month":month,
                "day":day

            }
            
               #dummy task to show that we skipped monthly file generation when its not 1st of the month
        @task
        def no_monthly_file(dummy:dict):
            logging.info("No Monthly file generated.")
            
        with TaskGroup(group_id=tg_id_3) as tg2:
            get_var = get_run_param('Test_FIle1')
            #aod_str = get_var['current_date'] #returns aod in str format
            #aod = datetime.strptime(aod_str, "%Y%m%d")
            ##aod =  datetime.strptime(get_var['asOfDate'], "%Y-%m-%d")-- TypeError: strptime() argument 1 must be str, not PlainXComArg using xcom generated value is the issue
            ##aod = get_var['asOfDate']#errors out
            #if  aod.day != 1:
            #        no_monthly_file_dummy=no_monthly_file(get_var)
            #        no_monthly_file_dummy 
            #else :
            #pull asoddate value from xcom  not dag /use task instance
            #as_Of_Date = tg2.xcom_pull(key='as_Of_Date', task_ids='get_run_param')   


            as_of_date_str = get_var['current_date']
            as_of_date = datetime.strptime(as_of_date_str, "%Y%m%d")  #E         as_of_date = datetime.strptime(as_of_date_str, "%Y%m%d")     TypeError: strptime() argument 1 must be str, not PlainXComArg
            as_of_date_str = get_var['current_date']
            if  as_of_date.day != 1:
                    no_monthly_file_dummy=no_monthly_file(get_var)
                    no_monthly_file_dummy 
            else :
                copy_to_s3 = copy_sf_to_s3(get_var)
                download_file = download_s3_file(copy_to_s3)
                sftp_upload_file = sftp_upload(download_file, sftp_conn_id=SFTP_Conn1)
                copy_to_s3 >> download_file >> sftp_upload_file```
0

There are 0 answers