filter out dagster assets based on group_name

67 views Asked by At

In my assets file, I have 3 groups but they are differentiated based on their group_name*

assets/my_assets.py:

@asset(
    group_name="group1"
)
def group1_data(context: AssetExecutionContext):
    x = 1 + 3
 
   
@asset(
    group_name="group1"
)
def group1_full_data(context: AssetExecutionContext):
    x = 1 + 6

@asset(
    group_name="group2"
)
def group2_data(context: AssetExecutionContext):
    x = 1 + 1       

assets/init.py:

all_assets = load_assets_from_modules([my_assets])

Now when I load them using load_assets_from_modules, I always end up loading all assets together. Is it not possible to load only those with a specific group name?

Because I want to run 2 different jobs for 2 different groups:

from dagster import define_asset_job, load_assets_from_modules
from ..assets import my_assets

my_group1_job = define_asset_job(name="group1_job", 
                                         selection=load_assets_from_modules([my_assets]),
                                         description="Loads only group1 data")
1

There are 1 answers

0
JackColo_Ben4 On

Ciao, I had the same necessity today for my project. I am quite sure that all methods from dagster\_core\definitions\load_assets_from_modules.py don't support direct filtering.

Therefore, I came up with this simple solution.

from dagster import define_asset_job, load_assets_from_modules
from ..assets import my_assets, your_assets, his_assets

all_assets = load_assets_from_modules([my_assets, your_assets, his_assets])
cut_down_assets = []

for one_asset in all_assets:
    # Access to group_names attribute
    group_names = one_asset.get_attributes_dict().get("group_names_by_key", {})
    # Add the asset key when there is a match with the desired group
    for asskey, gname in group_names.items():
        if gname == 'group1':
           cut_down_assets.append(asskey.path[0])

# Create a new reduced list of AssetsDefinition objects
just_group1_assets = [aa for aa in all_assets if aa.key.path[-1] in cut_down_assets]

my_group1_job = define_asset_job(name="group1_job", 
                     selection=just_group1_assets,
                     description="Loads only group1 data")