I updated the TFX from 0.22.0 to 1.8.0 and now I'm getting this error:
TypeError: Argument input_params should be a Channel of type <class 'tfx.types.standard_artifacts.ExternalArtifact'> (got test_string)
With 0.22.0 I used external_input function like this and worked:
from tfx.utils.dsl_utils import external_input
my_componet = MyCustomComponent(input_params=external_input("test_string"))
But after upgrade tfx not work anymore. My code is:
import os
import absl
from typing import Any, Dict, List, Optional, Text
from tfx.components.base.base_component import BaseComponent
from tfx.components.base.executor_spec import ExecutorClassSpec
from tfx.components.example_gen import utils
from tfx.types import Channel, channel_utils, standard_artifacts, Artifact
from tfx.types.component_spec import ComponentSpec, ExecutionParameter, ChannelParameter
from tfx.proto import example_gen_pb2
from tfx.components.base.base_executor import BaseExecutor
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
_pipeline_name = 'debug_custom_pipeline'
_taxi_root = os.path.join(os.environ['HOME'], 'debug')
_data_root = os.path.join(os.path.dirname(__file__), 'data', 'simple')
_tfx_root = os.path.join(_taxi_root, 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db')
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     metadata_path: str) -> pipeline.Pipeline:
  print("creating pipeline")
  my_componet = MyCustomComponent(input_params="test_string")
  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[my_componet],
      enable_cache=True,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path))
class MyCustomExecutor(BaseExecutor):
    """Executor for ModelExporter component."""
    def Do(self, input_dict: Dict[Text, List[Artifact]],
         output_dict: Dict[Text, List[Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
        
        print("Starting executor")
        print(input_dict)
        print(output_dict)
        print(exec_properties)
        print("Executor ran")
class MyCustomComponentSpec(ComponentSpec):
    PARAMETERS = {
        'input_config': ExecutionParameter(type=example_gen_pb2.Input),
        'output_config': ExecutionParameter(type=example_gen_pb2.Output)
    }
    INPUTS = {
        'input_params': ChannelParameter(type=standard_artifacts.ExternalArtifact)
    }
    OUTPUTS = {
        'output': ChannelParameter(type=standard_artifacts.String, optional=True)
    }
class MyCustomComponent(BaseComponent):
    SPEC_CLASS = MyCustomComponentSpec
    EXECUTOR_SPEC = ExecutorClassSpec(MyCustomExecutor)
    def __init__(
        self,
        input_params: Optional[Channel] = None
    ):
        input_config = utils.make_default_input_config()
        output_config = example_gen_pb2.Output()
        output = None
        spec = MyCustomComponentSpec(
            input_config=input_config,
            output_config=output_config,
            input_params=input_params,
            output=output
        )
        super(MyCustomComponent, self).__init__(spec=spec)
if __name__ == '__main__':
  absl.logging.set_verbosity(absl.logging.DEBUG)
  BeamDagRunner().run(
      _create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_root=_data_root,
          metadata_path=_metadata_path))
To reproduce the code, just run with python my_file_name.py
I'm using these versions:
tfx=1.8.0
tensorflow=2.8.0
apache-beam=2.50.0
python=3.9
I would like to set up my custom component with one string. Can anybody help me?