I updated the TFX from 0.22.0 to 1.8.0 and now I'm getting this error:

TypeError: Argument input_params should be a Channel of type <class 'tfx.types.standard_artifacts.ExternalArtifact'> (got test_string)

With 0.22.0 I used external_input function like this and worked:

from tfx.utils.dsl_utils import external_input
my_componet = MyCustomComponent(input_params=external_input("test_string"))

But after upgrade tfx not work anymore. My code is:

import os
import absl
from typing import Any, Dict, List, Optional, Text
from tfx.components.base.base_component import BaseComponent
from tfx.components.base.executor_spec import ExecutorClassSpec
from tfx.components.example_gen import utils
from tfx.types import Channel, channel_utils, standard_artifacts, Artifact
from tfx.types.component_spec import ComponentSpec, ExecutionParameter, ChannelParameter
from tfx.proto import example_gen_pb2
from tfx.components.base.base_executor import BaseExecutor
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

_pipeline_name = 'debug_custom_pipeline'
_taxi_root = os.path.join(os.environ['HOME'], 'debug')
_data_root = os.path.join(os.path.dirname(__file__), 'data', 'simple')
_tfx_root = os.path.join(_taxi_root, 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db')


def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     metadata_path: str) -> pipeline.Pipeline:
  print("creating pipeline")
  my_componet = MyCustomComponent(input_params="test_string")

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[my_componet],
      enable_cache=True,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path))


class MyCustomExecutor(BaseExecutor):
    """Executor for ModelExporter component."""

    def Do(self, input_dict: Dict[Text, List[Artifact]],
         output_dict: Dict[Text, List[Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
        
        print("Starting executor")
        print(input_dict)
        print(output_dict)
        print(exec_properties)
        print("Executor ran")


class MyCustomComponentSpec(ComponentSpec):
    PARAMETERS = {
        'input_config': ExecutionParameter(type=example_gen_pb2.Input),
        'output_config': ExecutionParameter(type=example_gen_pb2.Output)
    }
    INPUTS = {
        'input_params': ChannelParameter(type=standard_artifacts.ExternalArtifact)
    }
    OUTPUTS = {
        'output': ChannelParameter(type=standard_artifacts.String, optional=True)
    }


class MyCustomComponent(BaseComponent):
    SPEC_CLASS = MyCustomComponentSpec
    EXECUTOR_SPEC = ExecutorClassSpec(MyCustomExecutor)

    def __init__(
        self,
        input_params: Optional[Channel] = None
    ):
        input_config = utils.make_default_input_config()
        output_config = example_gen_pb2.Output()
        output = None

        spec = MyCustomComponentSpec(
            input_config=input_config,
            output_config=output_config,
            input_params=input_params,
            output=output
        )

        super(MyCustomComponent, self).__init__(spec=spec)

if __name__ == '__main__':
  absl.logging.set_verbosity(absl.logging.DEBUG)
  BeamDagRunner().run(
      _create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_root=_data_root,
          metadata_path=_metadata_path))

To reproduce the code, just run with python my_file_name.py

I'm using these versions:

tfx=1.8.0
tensorflow=2.8.0
apache-beam=2.50.0
python=3.9

I would like to set up my custom component with one string. Can anybody help me?

0

There are 0 answers