I updated the TFX from 0.22.0 to 1.8.0 and now I'm getting this error:
TypeError: Argument input_params should be a Channel of type <class 'tfx.types.standard_artifacts.ExternalArtifact'> (got test_string)
With 0.22.0 I used external_input function like this and worked:
from tfx.utils.dsl_utils import external_input
my_componet = MyCustomComponent(input_params=external_input("test_string"))
But after upgrade tfx not work anymore. My code is:
import os
import absl
from typing import Any, Dict, List, Optional, Text
from tfx.components.base.base_component import BaseComponent
from tfx.components.base.executor_spec import ExecutorClassSpec
from tfx.components.example_gen import utils
from tfx.types import Channel, channel_utils, standard_artifacts, Artifact
from tfx.types.component_spec import ComponentSpec, ExecutionParameter, ChannelParameter
from tfx.proto import example_gen_pb2
from tfx.components.base.base_executor import BaseExecutor
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
_pipeline_name = 'debug_custom_pipeline'
_taxi_root = os.path.join(os.environ['HOME'], 'debug')
_data_root = os.path.join(os.path.dirname(__file__), 'data', 'simple')
_tfx_root = os.path.join(_taxi_root, 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db')
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
metadata_path: str) -> pipeline.Pipeline:
print("creating pipeline")
my_componet = MyCustomComponent(input_params="test_string")
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[my_componet],
enable_cache=True,
metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path))
class MyCustomExecutor(BaseExecutor):
"""Executor for ModelExporter component."""
def Do(self, input_dict: Dict[Text, List[Artifact]],
output_dict: Dict[Text, List[Artifact]],
exec_properties: Dict[Text, Any]) -> None:
print("Starting executor")
print(input_dict)
print(output_dict)
print(exec_properties)
print("Executor ran")
class MyCustomComponentSpec(ComponentSpec):
PARAMETERS = {
'input_config': ExecutionParameter(type=example_gen_pb2.Input),
'output_config': ExecutionParameter(type=example_gen_pb2.Output)
}
INPUTS = {
'input_params': ChannelParameter(type=standard_artifacts.ExternalArtifact)
}
OUTPUTS = {
'output': ChannelParameter(type=standard_artifacts.String, optional=True)
}
class MyCustomComponent(BaseComponent):
SPEC_CLASS = MyCustomComponentSpec
EXECUTOR_SPEC = ExecutorClassSpec(MyCustomExecutor)
def __init__(
self,
input_params: Optional[Channel] = None
):
input_config = utils.make_default_input_config()
output_config = example_gen_pb2.Output()
output = None
spec = MyCustomComponentSpec(
input_config=input_config,
output_config=output_config,
input_params=input_params,
output=output
)
super(MyCustomComponent, self).__init__(spec=spec)
if __name__ == '__main__':
absl.logging.set_verbosity(absl.logging.DEBUG)
BeamDagRunner().run(
_create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
metadata_path=_metadata_path))
To reproduce the code, just run with python my_file_name.py
I'm using these versions:
tfx=1.8.0
tensorflow=2.8.0
apache-beam=2.50.0
python=3.9
I would like to set up my custom component with one string. Can anybody help me?