How to create a list of catalog entries and pass them in as inputs in Kedro Pipeline

1.6k views Asked by At

I am trying to get a list of datasets from a catalog file i have created and pass them in as inputs of a single node to combine them and ultimately run the pipeline on airflow using the kedro-airflow plugin

This works on the cli with kedro run but seems to fail in airflow and I am not sure why:

#my_pipeline/pipeline.py
def create_pipeline(**kwargs):
      conf_loader = ConfigLoader(['conf/base'])
      conf_catalog = conf_loader.get('catalog-a*')

      datasets = [key for key, value in conf_catalog.items()] 
      return Pipeline([
           node(
            func=combine_data,
            inputs=datasets,
            outputs="combined_data",
            name="combined_data"
        ),
        ...#other nodes
      ])

The error I am getting on airflow looks something like this: Broken dag: Given configuration path either does not exist or is not a valid directory: 'conf/base'

This is a Kedro config loader error for sure but i can't seem to figure out why the only error occurs when running the pipeline via airflow. From what I have been reading mixing in the code API is not advised. Is this the right way pass in a list of datasets?

Edit

My catalog is basically a list of Sql query datasets:

dataset_1:
  type: pandas.SQLQueryDataSet
  sql: select * from my_table where created_at >= '2018-12-21 16:00:00' and partner_id=1
  credentials: staging_sql

dataset_2:
  type: pandas.SQLQueryDataSet
  sql: select * from my_table where created_at >= '2019-08-15 11:55:00' and partner_id=2
  credentials: staging_sql
1

There are 1 answers

1
mayurc On

I think it might fail because kedro run is running this from its root directory where it can find the conf/base but the create_pipeline function is under my_pipeline directory so kedro ConfigLoader cannot find that. I think another way I've done this in the past is, to pass catalog: DataCatalog like this:

def create_pipeline(catalog: DataCatalog = None, * *kwargs) -> Pipeline:

Then you can iterate over or do:

datasets = catalog.datasets.