I used the following dag, as well as the profile.yml and dbt_project.yml files. The other dag tasks related to secret testing and printing secrets performed successfully, and I can see the accountname, username, password, and other variables in the dag task run logs, however the dbt run task failed with KeyError: 'Variable ACCOUNTNAME does not exist'. Did anyone successfully configure the airflow-mwaa dbt, run the jobs, and appreciate the suggestions?
Configure the requirements.txt file using the most recent versions. jsonschema==4.21.1, dbt-core==1.7.6, dbt-snowflake==1.7.6
Have created the env_var_plugin.py file under the plugin.zip
env_var_plugin.py:
from airflow.plugins_manager import AirflowPlugin
import os
os.environ["DBT_LOG_DIR"] = "/usr/local/airflow/tmp/logs"
os.environ["DBT_PACKAGE_DIR"] = "/usr/local/airflow/tmp/dbt_packages"
os.environ["DBT_TARGET_DIR"] = "/usr/local/airflow/tmp/target"
class EnvVarPlugin(AirflowPlugin):
name = 'env_var_plugin'
Also, update the dbt.project.yml with the path configs
dbt.project.yml:
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'sfdc_integration'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'sfdc_integration'
# Configuration for paths
configs:
log-path: "{{ env_var('DBT_LOG_DIR', 'logs') }}" # directory which will store dbt logs
packages-install-path: "{{ env_var('DBT_PACKAGE_DIR', 'dbt_packages') }}" # directory which will store dbt packages
target-path: "{{ env_var('DBT_TARGET_DIR', 'target') }}" # directory which will store compiled SQL files
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
clean-targets: # directories to be removed by `dbt clean`
- "target"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
sfdc_integration:
# Config indicated by + and applies to all files under models/example/
# Applies to all files under models/
models:
+materialized: view
Dag code:
import json
from airflow import DAG
from airflow.models.variable import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
import os
sm_secretId_name = Variable.get("SNOWFLAKE_SECURE_SECRETNAME")
region = Variable.get("REGION")
### set up Secrets Manager
hook = AwsBaseHook(client_type="secretsmanager")
client = hook.get_client_type(region_name="ap-northeast-1")
response = client.get_secret_value(SecretId=sm_secretId_name)
database_parameter = json.loads(response["SecretString"])
def test_secrets_manager_connection():
"""
Test connection to Secrets Manager and retrieve database parameters.
"""
assert "ACCOUNTNAME" in database_parameter, "Missing ACCOUNTNAME in database parameters"
assert "USERNAME" in database_parameter, "Missing USERNAME in database parameters"
assert "PASSWORD" in database_parameter, "Missing PASSWORD in database parameters"
assert "ROLE" in database_parameter, "Missing ROLE in database parameters"
assert "DATABASE" in database_parameter, "Missing DATABASE in database parameters"
assert "XSMALL_WAREHOUSE" in database_parameter, "Missing XSMALL_WAREHOUSE in database parameters"
def print_secrets(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='secrets', value=database_parameter)
print(f"Secrets retrieved: {database_parameter}")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 8),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
DAG_ID = os.path.basename(__file__).replace(".py", "")
with DAG(
dag_id=DAG_ID,
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
default_args=default_args,
) as dag:
test_secrets_manager = PythonOperator(
task_id="test_secrets_manager_connection",
python_callable=test_secrets_manager_connection
)
print_secrets_task = PythonOperator(
task_id="print_secrets",
python_callable=print_secrets,
provide_context=True,
)
cli_command = BashOperator(
task_id="sfdc_integration",
bash_command="""
cp -R /usr/local/airflow/dags/dbt/sfdc_integration /tmp
cd /tmp/sfdc_integration
export DBT_PROFILES_DIR='/usr/local/airflow/dags/dbt'
export SNOWFLAKE_ACCOUNT={{ var.value.ACCOUNTNAME }}
export SNOWFLAKE_USER={{ var.value.USERNAME }}
export SNOWFLAKE_PASSWORD={{ var.value.PASSWORD }}
export SNOWFLAKE_ROLE={{ var.value.ROLE }}
export SNOWFLAKE_DATABASE={{ var.value.DATABASE }}
export SNOWFLAKE_WAREHOUSE={{ var.value.XSMALL_WAREHOUSE }}
export SNOWFLAKE_SCHEMA={{ var.value.SFDC_ANZ_ANALYTICS_SECURE }}
export AWS_DEFAULT_REGION={{ var.value.REGION }}
export DBT_LOG_DIR='/usr/local/airflow/tmp/logs'
/usr/local/airflow/.local/bin/dbt run --project-dir /tmp/sfdc_integration/
cat /tmp/sfdc_integration/logs/dbt.log
""",
env={
'ACCOUNTNAME': "{{ secrets['ACCOUNTNAME'] }}",
'USERNAME': "{{ secrets['USERNAME'] }}",
'PASSWORD': "{{ secrets['PASSWORD'] }}",
'ROLE': "{{ secrets['ROLE'] }}",
'DATABASE': "{{ secrets['DATABASE'] }}",
'XSMALL_WAREHOUSE': "{{ secrets['XSMALL_WAREHOUSE'] }}",
'SCHEMA': "{{ var.value.SFDC_ANZ_ANALYTICS_SECURE }}",
'REGION': "{{ var.value.REGION }}",
}
)
test_secrets_manager >> print_secrets_task >> cli_command
profile.yml:
sfdc_integration:
target: dev
outputs:
dev:
type: snowflake
account: "{{ var.value.ACCOUNTNAME }}"
user: "{{ var.value.USERNAME }}"
password: "{{ var.value.PASSWORD }}"
role: "{{ var.value.ROLE }}"
database: "{{ var.value.DATABASE }}"
warehouse: "{{ var.value.XSMALL_WAREHOUSE }}"
schema: "{{ var.value.SFDC_ANZ_ANALYTICS_SECURE }}"
threads: 1
client_session_keep_alive: False