Pass a partitioned TabularDataset into ParallelRunStep with azureml sdkv1

142 views Asked by At

Trying to pass a partitioned TabularDataset into a ParallelRunStep as input, but getting the error and can't figure out why azureml ParallelRunStep can't recognize the partitioned dataset:

UserInputNotPartitionedByGivenKeys: The input dataset 'partitioned_combined_scored_dataset_input' is not partitioned by 'model_name'.

Traceback (most recent call last):
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role_process.py", line 111, in run
    loop.run_until_complete(self.master_role.start())
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role.py", line 303, in start
    await self.wait_for_first_task()
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role.py", line 288, in wait_for_first_task
    await self.wait_for_input_init()
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role.py", line 126, in wait_for_input_init
    self.future_create_tasks.result()
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/task_producer.py", line 199, in create_tasks
    raise exc
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/task_producer.py", line 190, in create_tasks
    for task_group in self.get_task_groups(provider.get_tasks()):
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/task_producer.py", line 169, in get_task_groups
    for index, task in enumerate(tasks):
  File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/partition_by_keys_provider.py", line 77, in get_tasks
    raise UserInputNotPartitionedByGivenKeys(message=message, compliant_message=compliant_message)
UserInputNotPartitionedByGivenKeys: The input dataset 'partitioned_combined_scored_dataset_input' is not partitioned by 'model_name'.

ParallelRunConfig & ParallelRunStep

    parallel_run_config = ParallelRunConfig(
        source_directory=source_dir_for_snapshot,
        entry_script="src/steps/script.py",
        partition_keys=["model_name"], 
        error_threshold=10,        
        allowed_failed_count=15,
        allowed_failed_percent=10,
        run_max_try=3,
        output_action="append_row",
        append_row_file_name="output_file.csv",
        environment=aml_run_config.environment,
        compute_target=aml_run_config.target,
        node_count=2
    )

    parallelrun_step = ParallelRunStep(
        name="Do Some Parallel Stuff on Each model_name",
        parallel_run_config=parallel_run_config ,
        inputs=[partitioned_combined_scored_dataset],
        output=OutputFileDatasetConsumptionConfig(name='output_dataset'),
        arguments=["--score-id", score_id_pipeline_param,
                   "--partitioned-combined-dataset", partitioned_combined_scored_dataset],
        allow_reuse=True
    )

partitioned_combined_scored_dataset

    partitioned_combined_scored_dataset = DatasetConsumptionConfig(
        name="partitioned_combined_scored_dataset_input",
        dataset=PipelineParameter(
            name="partitioned_combined_dataset",
            default_value=future_partitioned_dataset)
    )

and then partitioned_combined_scored_dataset was previously created and uploaded using:

partitioned_dataset = TabularDatasetFactory.from_parquet_files(path=(Datastore.get(ws, ), f"{partitioned_combined_datasets_dir}/*.parquet"))\
    .partition_by(
        partition_keys=['model_name'], 
        target=DataPath(Datastore(), 'some/path/to/partitioned')
    )

I know TabularDataset.partition_by() uploads to a GUID folder generated by AML so that some/path/to/partitioned actually creates some/path/to/partitioned/XXXXXXXX/{model_name}/part0.parquet for each partition on model_name according to documentation, so we've accounted for this when defining the tabular dataset passed into the PipelineParameter for partitioned_combined_scored_dataset at runtime... using

TabularDatasetFactory.from_parquet_files(path=(Datastore(), f"{partitioned_combined_dataset_dir}/*/*/*.parquet"))
1

There are 1 answers

0
Ramprasad On

it seems that you need to make sure that your input dataset is partitioned by the same key that you specify in the partition_keys argument of ParallelRunStep.

https://github.com/Azure/MachineLearningNotebooks/issues/1648