Creating and accessing Dagster resource for AWS Secrets Manager

186 views Asked by At

I am having some difficulty understanding how to use Dagster's library to access AWS Secrets Manager secrets as a resource and accessing the resource in assets

I have a file structure as follows

-proj
    -assets
        __init__.py
        asset_script.py
    -resources 
        __init__.py
        secrets_manager.py
    __init__.py

in secrets_manager.py I have the following

from dagster import ConfigurableResource
from dagster_aws.secretsmanager import SecretsManagerSecretsResource


class SecretsConnection(ConfigurableResource):
    sm = SecretsManagerSecretsResource(
        region_name="us-east-1",
        add_to_environment=False,
    )

    def get_secrets(self):
        return self.sm.fetch_secrets()

In my top level init (proj/init.py) i have the following:

from .assets import (
    asset_script
)
from .resources import (
    secrets_manager
)

assets = load_assets_from_package_module(
    package_module=asset_script,
    group_name="group_name",
)

defs = Definitions(
    assets=assets,
    resources={
        "secrets_manager": secrets_manager.SecretsConnection(),
    },
)

In my assets file I attempt to access these in the following way:

@asset(required_resource_keys={"secrets_manager"})
def fn_name(context: AssetExecutionContext):
    secrets = context.resources.secrets_manager.get_secrets()

I get an error that I can't make much sense of (related to the ConfigurableResource SecretsConnection):

Traceback (most recent call last):
  File "venv/lib/python3.8/site-packages/dagster/_grpc/server.py", line 295, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
  File "venv/lib/python3.8/site-packages/dagster/_grpc/server.py", line 139, in __init__
    loadable_targets = get_loadable_targets(
  File "venv/lib/python3.8/site-packages/dagster/_grpc/utils.py", line 50, in get_loadable_targets
    else loadable_targets_from_python_module(module_name, working_directory)
  File "/venv/lib/python3.8/site-packages/dagster/_core/workspace/autodiscovery.py", line 35, in loadable_targets_from_python_module
    module = load_python_module(
  File "venv/lib/python3.8/site-packages/dagster/_core/code_pointer.py", line 135, in load_python_module
    return importlib.import_module(module_name)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "proj/__init__.py", line 10, in <module>
    from .resources import (
  File "proj/resources/secrets_manager.py", line 5, in <module>
    class SecretsConnection(ConfigurableResource):
  File "/venv/lib/python3.8/site-packages/dagster/_config/pythonic_config/typing_utils.py", line 134, in __new__
    return super().__new__(cls, name, bases, namespaces, **kwargs)
  File "/venv/lib/python3.8/site-packages/dagster/_config/pythonic_config/typing_utils.py", line 78, in __new__
    return super().__new__(cls, name, bases, namespaces, **kwargs)
  File "pydantic/main.py", line 221, in pydantic.main.ModelMetaclass.__new__
  File "pydantic/fields.py", line 506, in pydantic.fields.ModelField.infer
  File "pydantic/fields.py", line 436, in pydantic.fields.ModelField.__init__
  File "pydantic/fields.py", line 546, in pydantic.fields.ModelField.prepare
  File "pydantic/fields.py", line 570, in pydantic.fields.ModelField._set_default_and_type
  File "pydantic/fields.py", line 439, in pydantic.fields.ModelField.get_default
  File "pydantic/utils.py", line 693, in pydantic.utils.smart_deepcopy
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 270, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 264, in _reconstruct
    y = func(*args)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 263, in <genexpr>
    args = (deepcopy(arg, memo) for arg in args)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 270, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 270, in _reconstruct
    state = deepcopy(state, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 146, in deepcopy
    y = copier(x, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 230, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 172, in deepcopy
    y = _reconstruct(x, memo, *rv)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copy.py", line 264, in _reconstruct
    y = func(*args)
  File "/usr/local/Cellar/[email protected]/3.8.17_1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/copyreg.py", line 91, in __newobj__
    return cls.__new__(cls, *args)
TypeError: __new__() missing 1 required positional argument: 'fields'
/venv/lib/python3.8/site-packages/dagster/_core/workspace/context.py:616: UserWarning: Error loading repository location testing:TypeError: __new__() missing 1 required positional argument: 'fields'


This is followed by the warning:

warnings.warn(f"Error loading repository location {location_name}:{error.to_string()}")
1

There are 1 answers

0
zyd On

I think the issue is that Dagster / Pydantic expects all the fields in your ConfigurableResource to be Pydantic Fields or Model subclasses. I think by instantiating the SecretsManagerResource at definition time like you are it breaks the initialization of SecretsConnection. Try something like this:

class SecretsConnection(ConfigurableResource):
    _sm = PrivateAttr()
    def setup_for_execution(self):
        _sm = SecretsManagerSecretsResource(
        region_name="us-east-1",
        add_to_environment=False,
    )
    def get_secrets(self):
        return self._sm.fetch_secrets()