Trying to pass a partitioned TabularDataset into a ParallelRunStep as input, but getting the error and can't figure out why azureml ParallelRunStep can't recognize the partitioned dataset:
UserInputNotPartitionedByGivenKeys: The input dataset 'partitioned_combined_scored_dataset_input' is not partitioned by 'model_name'.
Traceback (most recent call last):
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role_process.py", line 111, in run
loop.run_until_complete(self.master_role.start())
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role.py", line 303, in start
await self.wait_for_first_task()
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role.py", line 288, in wait_for_first_task
await self.wait_for_input_init()
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/master_role.py", line 126, in wait_for_input_init
self.future_create_tasks.result()
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/task_producer.py", line 199, in create_tasks
raise exc
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/task_producer.py", line 190, in create_tasks
for task_group in self.get_task_groups(provider.get_tasks()):
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/task_producer.py", line 169, in get_task_groups
for index, task in enumerate(tasks):
File "/tmp/48a0ec47-b89c-41ff-89f8-3482d2823d20/prs_prod/lib/python3.8/site-packages/azureml_sys/parallel_run/partition_by_keys_provider.py", line 77, in get_tasks
raise UserInputNotPartitionedByGivenKeys(message=message, compliant_message=compliant_message)
UserInputNotPartitionedByGivenKeys: The input dataset 'partitioned_combined_scored_dataset_input' is not partitioned by 'model_name'.
ParallelRunConfig & ParallelRunStep
parallel_run_config = ParallelRunConfig(
source_directory=source_dir_for_snapshot,
entry_script="src/steps/script.py",
partition_keys=["model_name"],
error_threshold=10,
allowed_failed_count=15,
allowed_failed_percent=10,
run_max_try=3,
output_action="append_row",
append_row_file_name="output_file.csv",
environment=aml_run_config.environment,
compute_target=aml_run_config.target,
node_count=2
)
parallelrun_step = ParallelRunStep(
name="Do Some Parallel Stuff on Each model_name",
parallel_run_config=parallel_run_config ,
inputs=[partitioned_combined_scored_dataset],
output=OutputFileDatasetConsumptionConfig(name='output_dataset'),
arguments=["--score-id", score_id_pipeline_param,
"--partitioned-combined-dataset", partitioned_combined_scored_dataset],
allow_reuse=True
)
partitioned_combined_scored_dataset
partitioned_combined_scored_dataset = DatasetConsumptionConfig(
name="partitioned_combined_scored_dataset_input",
dataset=PipelineParameter(
name="partitioned_combined_dataset",
default_value=future_partitioned_dataset)
)
and then partitioned_combined_scored_dataset
was previously created and uploaded using:
partitioned_dataset = TabularDatasetFactory.from_parquet_files(path=(Datastore.get(ws, ), f"{partitioned_combined_datasets_dir}/*.parquet"))\
.partition_by(
partition_keys=['model_name'],
target=DataPath(Datastore(), 'some/path/to/partitioned')
)
I know TabularDataset.partition_by() uploads to a GUID folder generated by AML so that some/path/to/partitioned
actually creates some/path/to/partitioned/XXXXXXXX/{model_name}/part0.parquet
for each partition on model_name according to documentation, so we've accounted for this when defining the tabular dataset passed into the PipelineParameter for partitioned_combined_scored_dataset
at runtime... using
TabularDatasetFactory.from_parquet_files(path=(Datastore(), f"{partitioned_combined_dataset_dir}/*/*/*.parquet"))
it seems that you need to make sure that your input dataset is partitioned by the same key that you specify in the partition_keys argument of ParallelRunStep.
https://github.com/Azure/MachineLearningNotebooks/issues/1648