using Java custom operator within java API in infosphere streams

231 views Asked by At

I have been searching for awhile now on how to use a customized Java operator withing infosphere streams Java API

What I need is after writing a customized operator like below ...

public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....

I want to use it like the below ....

        Topology topology = new Topology("toplogy_test");
        TStream<String> inDataFileName = ...
//call the "Test" operator here
1

There are 1 answers

2
Samantha Chan On BEST ANSWER

You can call a Java operator / C++ operator from the topology API by doing the following:

  1. Add the toolkit of the Java operator:

    SPL.addToolkit(topology, new File("/home/streamsadmin/myTk"));

  2. Convert the incoming stream to SPL stream:

    StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
    
    SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
    
      @Override
      public OutputTuple apply(String input_string, OutputTuple output_rstring) {
        output_rstring.setString("rstring_attr_name", input_string);
        return output_rstring;
      }}, rstringSchema);
    
  3. Invoke the operator:

    SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());

You can find more information about this here:

http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/#integrating-spl-operators-with-the-java-application-api

On a side note, if you are thinking about using the Topology API to write the Streams topology, then it's easier to write a regular Java class, and invoke it directly from the Topology API.

For example:

MyJavaCode someObj = new MyJavaCode();

Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
        @Override
        public String apply(String word) {
            return someObj.someFunction(word);
        }
    });

The only requirement here is that your Java class needs to implement Serializable.