In GCP I run a Kubeflow ML pipeline with TFX components using a custom service account. The pipeline reads data from BigQuery and it has the following components: components = [example_gen, statistics_gen, schema_gen, transform, trainer, pusher]
The main problem is that it fails at the last "trainer" step, although I tested each step in the interactive context and all were OK. The secondary problem is that I cannot display log messages for the trainer module execution code in the main GCP pipeline dashboard (in the logs area). This complicates my debugging attempts. I can only view the logs from Logs Explorer but I cannot display the messages for the python trainer module code, seem only to be the framework messages. In those messages I view only one type of error message. I identified that this operation uses the default service account (not the custom one) and it might not have all permissions needed. I tried to set the trainer component to use the custom SA but it does not use it. How can I set it properly for the custom SA?
com.google.cloud.ai.platform.common.errors.AiPlatformException: code=ALREADY_EXISTS, message=Schema with name projects/2385..../locations/..../metadataStores/default/metadataSchemas/f14caf7e-4234-473d-ac31-.... and version 0.0.1 already exists., cause=null
The code for the trained module file:
%%writefile {_trainer_module_file}
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
from tfx_bsl.tfxio import dataset_options
from tfx.components.trainer.executor import Executor
from tfx.dsl.components.base import executor_spec
from typing import List
from absl import logging
# from tensorflow import keras
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("This is an informational log message.")
logging.warning("This is a warning log message.")
logging.error("This is an error log message.")
# Categorical features are assumed to each have a maximum value in the dataset.
_MAX_CATEGORICAL_FEATURE_VALUES = [3]
_CATEGORICAL_FEATURE_KEYS = [
'F_1'
]
_DENSE_FLOAT_FEATURE_KEYS = ['F_2', 'F_3']
# Number of buckets used by tf.transform for encoding each feature.
_FEATURE_BUCKET_COUNT = 5
_BUCKET_FEATURE_KEYS = [
'F_2'
]
# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
_VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
_OOV_SIZE = 10
_VOCAB_FEATURE_KEYS = [
'F_1',
]
# Keys
_LABEL_KEY = 'F_LABEL'
print('_LABEL_KEY',_LABEL_KEY)
###########
# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec
def _build_estimator(config, hidden_units=None, warm_start_from=None):
"""Build an estimator for predicting the tipping behavior of taxi riders.
Args:
config: tf.estimator.RunConfig defining the runtime environment for the
estimator (including model_dir).
hidden_units: [int], the layer sizes of the DNN (input layer first)
warm_start_from: Optional directory to warm start from.
Returns:
A dict of the following:
- estimator: The estimator that will be used for training and eval.
- train_spec: Spec for training.
- eval_spec: Spec for eval.
- eval_input_receiver_fn: Input function for eval.
"""
real_valued_columns = [
tf.feature_column.numeric_column(key, shape=())
for key in _DENSE_FLOAT_FEATURE_KEYS
]
categorical_columns = [
tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension
key,
num_buckets=num_buckets,
default_value=0) for key, num_buckets in zip(
_CATEGORICAL_FEATURE_KEYS,
_MAX_CATEGORICAL_FEATURE_VALUES)
]
return tf.estimator.DNNLinearCombinedClassifier(
config=config,
linear_feature_columns=categorical_columns,
dnn_feature_columns=real_valued_columns,
dnn_hidden_units=hidden_units or [100, 70, 50, 25],
warm_start_from=warm_start_from)
def _example_serving_receiver_fn(tf_transform_graph, schema):
"""Build the serving in inputs.
Args:
tf_transform_graph: A TFTransformOutput.
schema: the schema of the input data.
Returns:
Tensorflow graph which parses examples, applying tf-transform to them.
"""
raw_feature_spec = _get_raw_feature_spec(schema)
raw_feature_spec.pop(_LABEL_KEY)
raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
raw_feature_spec, default_batch_size=None)
serving_input_receiver = raw_input_fn()
transformed_features = tf_transform_graph.transform_raw_features(
serving_input_receiver.features)
return tf.estimator.export.ServingInputReceiver(
transformed_features, serving_input_receiver.receiver_tensors)
def _eval_input_receiver_fn(tf_transform_graph, schema):
"""Build everything needed for the tf-model-analysis to run the model.
Args:
tf_transform_graph: A TFTransformOutput.
schema: the schema of the input data.
Returns:
EvalInputReceiver function, which contains:
- Tensorflow graph which parses raw untransformed features, applies the
tf-transform preprocessing operators.
- Set of raw, untransformed features.
- Label against which predictions will be compared.
"""
# Notice that the inputs are raw features, not transformed features here.
raw_feature_spec = _get_raw_feature_spec(schema)
serialized_tf_example = tf.compat.v1.placeholder(
dtype=tf.string, shape=[None], name='input_example_tensor')
# Add a parse_example operator to the tensorflow graph, which will parse
# raw, untransformed, tf examples.
features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
# Now that we have our raw examples, process them through the tf-transform
# function computed during the preprocessing step.
transformed_features = tf_transform_graph.transform_raw_features(
features)
# The key name MUST be 'examples'.
receiver_tensors = {'examples': serialized_tf_example}
# NOTE: Model is driven by transformed features (since training works on the
# materialized output of TFT, but slicing will happen on raw features.
features.update(transformed_features)
return tfma.export.EvalInputReceiver(
features=features,
receiver_tensors=receiver_tensors,
labels=transformed_features[_LABEL_KEY])
def _input_fn(file_pattern, data_accessor, tf_transform_output, batch_size=200):
"""Generates features and label for tuning/training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
return data_accessor.tf_dataset_factory(
file_pattern,
dataset_options.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
tf_transform_output.transformed_metadata.schema)
# TFX will call this function
def trainer_fn(trainer_fn_args, schema):
"""Build the estimator using the high level API.
Args:
trainer_fn_args: Holds args used to train the model as name/value pairs.
schema: Holds the schema of the training examples.
Returns:
A dict of the following:
- estimator: The estimator that will be used for training and eval.
- train_spec: Spec for training.
- eval_spec: Spec for eval.
- eval_input_receiver_fn: Input function for eval.
"""
# Number of nodes in the first layer of the DNN
first_dnn_layer_size = 100
num_dnn_layers = 4
dnn_decay_factor = 0.7
train_batch_size = 40
eval_batch_size = 40
tf_transform_graph = tft.TFTransformOutput(trainer_fn_args.transform_output)
train_input_fn = lambda: _input_fn( # pylint: disable=g-long-lambda
trainer_fn_args.train_files,
trainer_fn_args.data_accessor,
tf_transform_graph,
batch_size=train_batch_size)
eval_input_fn = lambda: _input_fn( # pylint: disable=g-long-lambda
trainer_fn_args.eval_files,
trainer_fn_args.data_accessor,
tf_transform_graph,
batch_size=eval_batch_size)
train_spec = tf.estimator.TrainSpec( # pylint: disable=g-long-lambda
train_input_fn,
max_steps=trainer_fn_args.train_steps)
serving_receiver_fn = lambda: _example_serving_receiver_fn( # pylint: disable=g-long-lambda
tf_transform_graph, schema)
exporter = tf.estimator.FinalExporter('etl-trainer', serving_receiver_fn)
eval_spec = tf.estimator.EvalSpec(
eval_input_fn,
steps=trainer_fn_args.eval_steps,
exporters=[exporter],
name='etl-eval')
run_config = tf.estimator.RunConfig(
save_checkpoints_steps=999, keep_checkpoint_max=1)
run_config = run_config.replace(model_dir=trainer_fn_args.serving_model_dir)
estimator = _build_estimator(
# Construct layers sizes with exponetial decay
hidden_units=[
max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
for i in range(num_dnn_layers)
],
config=run_config,
warm_start_from=trainer_fn_args.base_model)
# Create an input receiver for TFMA processing
receiver_fn = lambda: _eval_input_receiver_fn( # pylint: disable=g-long-lambda
tf_transform_graph, schema)
return {
'estimator': estimator,
'train_spec': train_spec,
'eval_spec': eval_spec,
'eval_input_receiver_fn': receiver_fn
}
The TFX trainer component:
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=trainer_module_file,
custom_executor_spec=executor_spec.ExecutorClassSpec(Executor),
examples=transform.outputs['transformed_examples'],
schema=schema_gen.outputs['schema'],
transform_graph=transform.outputs['transform_graph'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=10))
The pipeline:
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
query=QUERY,
trainer_module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
transform_module_file=os.path.join(MODULE_ROOT, _etl_transform_module_file),
serving_model_dir=SERVING_MODEL_DIR,
beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))