Ordering steps in azure ml pipelines SDKv2 Python

46 views Asked by At

I have a ML pipleine that has two steps i.e Preprocessing and Monitoring.

In SDKV1, when used to have sequence steps:

step_sequence = StepSequence(steps=[StepPreprocessing, StepMonitoring])
pipeline = Pipeline(workspace=ws, steps=step_sequence)

but I am not sure how to do this with SDKV2. The steps in my pipeline don't share any inputs/outputs. Is it possible with V2 to describe an order of steps without sharing inputs/outputs.

Here is pseudocode for the pipeline:

@pipeline()
def sweden_backbook_monitoring(...arguemts...):
     preprocess = command(
            name="sweden-backbook-preprocess",
            display_name="Preprocessing",
            description="Performs preprocessing",
            inputs={
                "input_data_contract": Input(type='string'),
                "task": Input(type='string'),
                "input_data_path": Input(type='string'),
                "input_file": Input(type='string'),
                "register_data_name": Input(type='string'),
                "register_config_name": Input(type='string'),
                "deploy_flag": Input(type='string'),
            },
            code=e.preprocess_sources_directory,
            command="python -m preprocess.main",
            environment=pipeline_env_preprocess,
        ).component(
            input_data_contract=input_data_contract,
            task=task,
            input_data_path=input_data_path,
            input_file=input_file,
            register_data_name=register_data_name,
            register_config_name=register_config_name,
            deploy_flag=deploy_flag
        )

        # Creating command component
        monitoring = command(
            name="sweden-backbook-monitoring",
            display_name="Monitoring",
            description="Performs passive monitoring",
            inputs={
                "project_name": Input(type='string'),
                "snapshot": Input(type='string'),
                "model_name": Input(type='string'),
                "ml_datafile": Input(type='string'),
                "preprocessing_config_file": Input(type='string'),
                "datastore": Input(type='string'),
                "param_time_index_column": Input(type='string'),
                "p_value": Input(type='integer'),
                "data_drift_dataset": Input(type='string'),
                "test_label": Input(type='string')
            },
            code=e.model_monitoring_sources_directory,
            command="python -m model_monitoring.monitoring_service",
            environment=pipeline_env_monitoring,
        ).component(
            project_name=project_name,
            snapshot=snapshot,
            model_name=model_name,
            ml_datafile=ml_datafile,
            preprocessing_config_file=preprocessing_config_file,
            datastore=datastore,
            param_time_index_column=param_time_index_column,
            p_value=p_value,
            data_drift_dataset=data_drift_dataset,
            test_label=test_label
        )
        monitoring.environment_variables = {
            'cluster_identity_name':e.cluster_identity_name,
            'data_drift_event_topic_name':e.data_drift_monitoring_event_topic_name,
        }


pipeline_job = sweden_backbook_monitoring(
            input_data_contract='data_contract.json',
            task='train',
            input_data_path='somePath',
            input_file='train_data.parquet',
            register_data_name='d.parquet',
            register_config_name='p.json',
            deploy_flag=args.deploy_flag,
            project_name=args.project_name,
            snapshot=args.snapshot,
            model_name=args.model_name,
            ml_datafile='t.parquet',
            preprocessing_config_file='p.snapshot + '.json',
            datastore=e.default_datastore,
            param_time_index_column='snapshot_date',
            p_value=0.01,
            data_drift_dataset='swe.csv',
            test_label=args.test_label
        )

In the above pipeline, both steps start to run in parallel which is not desired. Some of the pipelines have 3,4 steps. Is it possible to somehow describe the order for these steps.

Thanks!

0

There are 0 answers