How do I define SM_CHANNEL_TRAIN for a training step in AWS Sagemaker Pipelines?

121 views Asked by At

I am using AWS Sagemaker Pipelines, and apparently I cannot correctly define the value for SM_CHANNEL_TRAIN. Here is my pipeline definition.

import boto3
import sagemaker
import sagemaker.session
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.model import Model
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.inputs import TrainingInput

# set up session
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = 'DemoGroupName'

# set up parameters
processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
input_data = ParameterString(
    name="InputData",
    default_value='s3://sagemaker-us-east-1-something/diabetes/Xy.csv',
)

# set data split step
split_step = ProcessingStep(
    name='SplitData',
    processor=SKLearnProcessor(
        framework_version='0.23-1',
        instance_type='ml.m5.xlarge',
        instance_count=processing_instance_count,
        base_job_name='sklearn-diabetes-split',
        role=role
    ),
    inputs=[
      ProcessingInput(
          source=input_data,
          destination='/opt/ml/processing/input'),  
    ],
    outputs=[
        ProcessingOutput(output_name='tr', source='/opt/ml/processing/tr'),
        ProcessingOutput(output_name='tu', source='/opt/ml/processing/tu'),
        ProcessingOutput(output_name='te', source='/opt/ml/processing/te')
    ],
    code='split.py'
)

# set up training step
training_step = TrainingStep(
    name="TrainRandomForest",
    estimator=sagemaker.sklearn.estimator.SKLearn(
        entry_point="train.py",
        role=role,
        instance_type="ml.m4.xlarge",
        framework_version="0.23-1",
    ),
    inputs={
        'train': TrainingInput(
            s3_data=split_step.properties.ProcessingOutputConfig.Outputs['tr'].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=split_step.properties.ProcessingOutputConfig.Outputs['tu'].S3Output.S3Uri,
            content_type='text/csv'
        )
    },
)

# set up modeling step
model_step = CreateModelStep(
    name="DiabetesCreateModel",
    model=Model(
        name="RandomForestModel",
        image_uri=training_step.properties.AlgorithmSpecification.TrainingImage,
        model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
        sagemaker_session=sagemaker_session,
        role=role
    ),
    inputs=CreateModelInput(
        instance_type="ml.m5.large",
        accelerator_type="ml.eia1.medium",
    )
)

# set up pipeline
pipeline = Pipeline(
    name="RandomForestPipeline",
    parameters=[
        processing_instance_count,
        input_data
    ],
    steps=[split_step, training_step, model_step]
)

# upload pipeline
pipeline.upsert(role_arn=role)

# execute pipeline
execution = pipeline.start()
execution.wait()

The python script train.py looks like the following.

import argparse
import os
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from joblib import dump

# Parse input arguments
parser = argparse.ArgumentParser()
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
args = parser.parse_args()

# Load the training data
print(f'args.train={args.train}')
train_data = pd.read_csv(args.train)

# Separate features and target variable
X = train_data.drop(columns=["y"])
y = train_data["y"]

# Create and train the Random Forest Regressor
rf_regressor = RandomForestRegressor(n_estimators=100, random_state=42)
rf_regressor.fit(X, y)

# Evaluate the model on the training data
y_pred = rf_regressor.predict(X)
mse = mean_squared_error(y, y_pred)
r2 = r2_score(y, y_pred)

# Print evaluation metrics
print(f"Mean Squared Error: {mse}")
print(f"R-squared: {r2}")

# Save the trained model
model_dir = os.environ.get("SM_MODEL_DIR")
dump(rf_regressor, os.path.join(model_dir, "model.joblib"))

When I execute the pipeline, I get the following error message.

Traceback (most recent call last):
  File "train.py", line 16, in 
    train_data = pd.read_csv(args.train)
  File "/miniconda3/lib/python3.7/site-packages/pandas/io/parsers.py", line 686, in read_csv
    return _read(filepath_or_buffer, kwds)
  File "/miniconda3/lib/python3.7/site-packages/pandas/io/parsers.py", line 452, in _read
    parser = TextFileReader(fp_or_buf, **kwds)
  File "/miniconda3/lib/python3.7/site-packages/pandas/io/parsers.py", line 946, in __init__
    self._make_engine(self.engine)
  File "/miniconda3/lib/python3.7/site-packages/pandas/io/parsers.py", line 1178, in _make_engine
    self._engine = CParserWrapper(self.f, **self.options)
  File "/miniconda3/lib/python3.7/site-packages/pandas/io/parsers.py", line 2008, in __init__
    self._reader = parsers.TextReader(src, **kwds)
  File "pandas/_libs/parsers.pyx", line 537, in pandas._libs.parsers.TextReader.__cinit__
  File "pandas/_libs/parsers.pyx", line 711, in pandas._libs.parsers.TextReader._get_header
  File "pandas/_libs/parsers.pyx", line 905, in pandas._libs.parsers.TextReader._tokenize_rows
  File "pandas/_libs/parsers.pyx", line 2042, in pandas._libs.parsers.raise_parser_error

The logs show these values for SM_TRAINING_ENV.

SM_TRAINING_ENV=
{
    "additional_framework_parameters": {},
    "channel_input_dirs": {
        "train": "/opt/ml/input/data/train",
        "validation": "/opt/ml/input/data/validation"
    },
    "current_host": "algo-1",
    "framework_module": "sagemaker_sklearn_container.training:main",
    "hosts": [
        "algo-1"
    ],
    "hyperparameters": {},
    "input_config_dir": "/opt/ml/input/config",
    "input_data_config": {
        "train": {
            "ContentType": "text/csv",
            "RecordWrapperType": "None",
            "S3DistributionType": "FullyReplicated",
            "TrainingInputMode": "File"
        },
        "validation": {
            "ContentType": "text/csv",
            "RecordWrapperType": "None",
            "S3DistributionType": "FullyReplicated",
            "TrainingInputMode": "File"
        }
    },
    "input_dir": "/opt/ml/input",
    "is_master": true,
    "job_name": "pipelines-2iuoezcd5ijo-TrainRandomForest-pzldTGMU6Q",
    "log_level": 20,
    "master_hostname": "algo-1",
    "model_dir": "/opt/ml/model",
    "module_dir": "s3://sagemaker-us-east-1-something/TrainRandomForest-827329dc760464c1ab24b8f994d9bf48/source/sourcedir.tar.gz",
    "module_name": "train",
    "network_interface_name": "eth0",
    "num_cpus": 4,
    "num_gpus": 0,
    "output_data_dir": "/opt/ml/output/data",
    "output_dir": "/opt/ml/output",
    "output_intermediate_dir": "/opt/ml/output/intermediate",
    "resource_config": {
        "current_group_name": "homogeneousCluster",
        "current_host": "algo-1",
        "current_instance_type": "ml.m4.xlarge",
        "hosts": [
            "algo-1"
        ],
        "instance_groups": [
            {
                "hosts": [
                    "algo-1"
                ],
                "instance_group_name": "homogeneousCluster",
                "instance_type": "ml.m4.xlarge"
            }
        ],
        "network_interface_name": "eth0"
    },
    "user_entry_point": "train.py"
}

The output from the following snippet does not show the CSV file that I expect. It prints out args.train=/opt/ml/input/data/train.

parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
print(f'args.train={args.train}')

From what I understand, setting the following should set both train and validation channels.

inputs={
        'train': TrainingInput(
            s3_data=split_step.properties.ProcessingOutputConfig.Outputs['tr'].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=split_step.properties.ProcessingOutputConfig.Outputs['tu'].S3Output.S3Uri,
            content_type='text/csv'
        )
    }

Any idea on what I am doing wrong?

0

There are 0 answers