airflow-mwaa snowflake dbt run

58 views Asked by At

I used the following dag, as well as the profile.yml and dbt_project.yml files. The other dag tasks related to secret testing and printing secrets performed successfully, and I can see the accountname, username, password, and other variables in the dag task run logs, however the dbt run task failed with KeyError: 'Variable ACCOUNTNAME does not exist'. Did anyone successfully configure the airflow-mwaa dbt, run the jobs, and appreciate the suggestions?

Configure the requirements.txt file using the most recent versions. jsonschema==4.21.1, dbt-core==1.7.6, dbt-snowflake==1.7.6

Have created the env_var_plugin.py file under the plugin.zip

env_var_plugin.py:

from airflow.plugins_manager import AirflowPlugin
import os

os.environ["DBT_LOG_DIR"] = "/usr/local/airflow/tmp/logs"
os.environ["DBT_PACKAGE_DIR"] = "/usr/local/airflow/tmp/dbt_packages"
os.environ["DBT_TARGET_DIR"] = "/usr/local/airflow/tmp/target"

class EnvVarPlugin(AirflowPlugin):                
    name = 'env_var_plugin'

Also, update the dbt.project.yml with the path configs

dbt.project.yml:

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'sfdc_integration'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'sfdc_integration'

# Configuration for paths
configs:
  log-path: "{{ env_var('DBT_LOG_DIR', 'logs') }}"  # directory which will store dbt logs
  packages-install-path: "{{ env_var('DBT_PACKAGE_DIR', 'dbt_packages') }}"  # directory which will store dbt packages
  target-path: "{{ env_var('DBT_TARGET_DIR', 'target') }}"  # directory which will store compiled SQL files

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets:         # directories to be removed by `dbt clean`
  - "target"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
  sfdc_integration:
    # Config indicated by + and applies to all files under models/example/
    # Applies to all files under models/
        models:
            +materialized: view

Dag code:

import json
from airflow import DAG
from airflow.models.variable import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
import os

sm_secretId_name = Variable.get("SNOWFLAKE_SECURE_SECRETNAME")
region = Variable.get("REGION")


### set up Secrets Manager
hook = AwsBaseHook(client_type="secretsmanager")
client = hook.get_client_type(region_name="ap-northeast-1")
response = client.get_secret_value(SecretId=sm_secretId_name)
database_parameter = json.loads(response["SecretString"])

def test_secrets_manager_connection():
    """
    Test connection to Secrets Manager and retrieve database parameters.
    """
    assert "ACCOUNTNAME" in database_parameter, "Missing ACCOUNTNAME in database parameters"
    assert "USERNAME" in database_parameter, "Missing USERNAME in database parameters"
    assert "PASSWORD" in database_parameter, "Missing PASSWORD in database parameters"
    assert "ROLE" in database_parameter, "Missing ROLE in database parameters"
    assert "DATABASE" in database_parameter, "Missing DATABASE in database parameters"    
    assert "XSMALL_WAREHOUSE" in database_parameter, "Missing XSMALL_WAREHOUSE in database parameters"

def print_secrets(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='secrets', value=database_parameter)
    print(f"Secrets retrieved: {database_parameter}")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 8),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

DAG_ID = os.path.basename(__file__).replace(".py", "")
with DAG(
    dag_id=DAG_ID,
    schedule_interval=None, 
    catchup=False,    
    start_date=days_ago(1),
    default_args=default_args,
) as dag:
    
    test_secrets_manager = PythonOperator(
        task_id="test_secrets_manager_connection",
        python_callable=test_secrets_manager_connection
    )

    print_secrets_task = PythonOperator(
        task_id="print_secrets",
        python_callable=print_secrets,
        provide_context=True,
    )

    cli_command = BashOperator(
        task_id="sfdc_integration",
        bash_command="""
        cp -R /usr/local/airflow/dags/dbt/sfdc_integration /tmp
        cd /tmp/sfdc_integration
        export DBT_PROFILES_DIR='/usr/local/airflow/dags/dbt'
        export SNOWFLAKE_ACCOUNT={{ var.value.ACCOUNTNAME }}
        export SNOWFLAKE_USER={{ var.value.USERNAME }}
        export SNOWFLAKE_PASSWORD={{ var.value.PASSWORD }}
        export SNOWFLAKE_ROLE={{ var.value.ROLE }}
        export SNOWFLAKE_DATABASE={{ var.value.DATABASE }}
        export SNOWFLAKE_WAREHOUSE={{ var.value.XSMALL_WAREHOUSE }}
        export SNOWFLAKE_SCHEMA={{ var.value.SFDC_ANZ_ANALYTICS_SECURE }}
        export AWS_DEFAULT_REGION={{ var.value.REGION }}
        export DBT_LOG_DIR='/usr/local/airflow/tmp/logs'
        /usr/local/airflow/.local/bin/dbt run --project-dir /tmp/sfdc_integration/
        cat /tmp/sfdc_integration/logs/dbt.log
        """,
        env={
            'ACCOUNTNAME': "{{ secrets['ACCOUNTNAME'] }}",
            'USERNAME': "{{ secrets['USERNAME'] }}",
            'PASSWORD': "{{ secrets['PASSWORD'] }}",
            'ROLE': "{{ secrets['ROLE'] }}",
            'DATABASE': "{{ secrets['DATABASE'] }}",
            'XSMALL_WAREHOUSE': "{{ secrets['XSMALL_WAREHOUSE'] }}",
            'SCHEMA': "{{ var.value.SFDC_ANZ_ANALYTICS_SECURE }}",
            'REGION': "{{ var.value.REGION }}",
        }
    )

test_secrets_manager >> print_secrets_task >> cli_command

profile.yml:

sfdc_integration:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: "{{ var.value.ACCOUNTNAME }}"
      user: "{{ var.value.USERNAME }}"
      password: "{{ var.value.PASSWORD }}"
      role: "{{ var.value.ROLE }}"
      database: "{{ var.value.DATABASE }}"
      warehouse: "{{ var.value.XSMALL_WAREHOUSE }}"
      schema: "{{ var.value.SFDC_ANZ_ANALYTICS_SECURE }}"
      threads: 1
      client_session_keep_alive: False
0

There are 0 answers