My initial tasks are queued for 30-40 sec (very long in my case)

32 views Asked by At

I am using AWS service MWAA, In that I have first DAG for refreshing the external tables of snowflake. It took 2 mins for it to complete the whole DAG (it has 9 task one by one to refresh 9 tables) out of which 20 sec are queued. Now, I want to use task concurrently. so I am running 3 task concurrently but still getting the same time output. I want to know the reason behind this queue and how can reduce the queue time. I am sharing the code as well as Screenshot for for understanding. enter image description here as you can see the queue time is much more than execution time.

My environment setting: enter image description here

            `from airflow import DAG
            from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
            from datetime import datetime, timedelta
            from airflow.operators.trigger_dagrun import TriggerDagRunOperator
            from commons.dag_failure_notification import send_failure_email

            default_args = {
                "retries": 1,
                "retry_delay": timedelta(minutes=5),
                "on_failure_callback": send_failure_email,
            }

            DAG_ID = "test_External_Table_refresh"

            with DAG(
                dag_id=DAG_ID,
                default_args=default_args,
                schedule_interval="30 20 * * *",
                catchup=False,
                start_date=datetime(2023, 10, 13),
                concurrency=3,  # Set concurrency to allow parallel execution of tasks
            ) as dag:

                # Define SnowflakeOperator task for refreshing FLIGHT_BOOKING table
                flight_booking_task = SnowflakeOperator(
                    task_id="refresh_flight_booking_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.FLIGHT_BOOKING REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define SnowflakeOperator task for refreshing USERS table
                users_task = SnowflakeOperator(
                    task_id="refresh_users_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.USERS REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define SnowflakeOperator task for refreshing PAYMENTS table
                payments_task = SnowflakeOperator(
                    task_id="refresh_payments_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.PAYMENTS REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define SnowflakeOperator task for refreshing CANCELLATIONS table
                cancellations_task = SnowflakeOperator(
                    task_id="refresh_cancellations_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.CANCELLATIONS REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define SnowflakeOperator task for refreshing REFUNDS table
                refunds_task = SnowflakeOperator(
                    task_id="refresh_refunds_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.REFUNDS REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define SnowflakeOperator task for refreshing WALLET table
                wallet_task = SnowflakeOperator(
                    task_id="refresh_wallet_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.WALLET REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define SnowflakeOperator task for refreshing WALLET_TRANSACTIONS table
                wallet_transactions_task = SnowflakeOperator(
                    task_id="refresh_wallet_transactions_external_table",
                    sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.WALLET_TRANSACTIONS REFRESH;",
                    snowflake_conn_id="Snowflake_dev",
                    dag=dag,
                )

                # Define TriggerDagRunOperator task
                trigger_dbt_deps = TriggerDagRunOperator(
                    task_id="trigger_dbt_deps", trigger_dag_id="test_DBT_deps", dag=dag
                )

                # Set up dependencies
                [users_task, flight_booking_task] >> trigger_dbt_deps
                payments_task >> cancellations_task >> refunds_task >> wallet_task >>
                wallet_transactions_task >> trigger_dbt_deps`
  • I tried removing dependencies.
  • I tried to change code.
0

There are 0 answers