Multiple invokable constructors in Storm Flux

640 views Asked by At

I'm using Storm Flux (0.10.0) DSL to deploy the following topology (simplified to keep only the relevant parts of it):

---
name: "my-topology"

components:
  - id: "esConfig"
    className: "java.util.HashMap"
    configMethods:
      - name: "put"
        args:
          - "es.nodes"
          - "${es.nodes}"

bolts:
  - id: "es-bolt"
    className: "org.elasticsearch.storm.EsBolt"
    constructorArgs:
      - "myindex/docs"
      - ref: "esConfig"
    parallelism: 1

# ... other bolts, spouts and streams here

As you can see, one of the bolts I use is org.elasticsearch.storm.EsBolt which has the following constructors (see code):

public EsBolt(String target) { ... }
public EsBolt(String target, boolean writeAck) { ... }
public EsBolt(String target, Map configuration) { ... }

The last one should be called because I pass a String and a Map in the constructorArgs. But when I deploy the topology I get the following exception, as if Flux wasn't able to infer the right constructor from the types (String, Map):

storm jar mytopology-1.0.0.jar org.apache.storm.flux.Flux --local mytopology.yml --filter mytopology.properties

...
Version: 0.10.0
Parsing file: mytopology.yml
958  [main] INFO  o.a.s.f.p.FluxParser - loading YAML from input stream...
965  [main] INFO  o.a.s.f.p.FluxParser - Performing property substitution.
969  [main] INFO  o.a.s.f.p.FluxParser - Not performing environment variable substitution.
1252 [main] INFO  o.a.s.f.FluxBuilder - Detected DSL topology...
1431 [main] WARN  o.a.s.f.FluxBuilder - Found multiple invokable constructors for class class org.elasticsearch.storm.EsBolt, given arguments [myindex/docs, {es.nodes=localhost}]. Using the last one found.
Exception in thread "main" java.lang.IllegalArgumentException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:291)
    at org.apache.storm.flux.FluxBuilder.buildBolts(FluxBuilder.java:372)
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:88)
    at org.apache.storm.flux.Flux.runCli(Flux.java:153)
    at org.apache.storm.flux.Flux.main(Flux.java:98)

Any idea about what could be happening? Here is how Storm Flux finds a compatible constructor. The magic is in the canInvokeWithArgs method.

These are Flux debug logs, where you see how FluxBuilder finds the most appropriate constructor:

Version: 0.10.0
Parsing file: mytopology.yml
559  [main] INFO  o.a.s.f.p.FluxParser - loading YAML from input stream...
566  [main] INFO  o.a.s.f.p.FluxParser - Performing property substitution.
569  [main] INFO  o.a.s.f.p.FluxParser - Not performing environment variable substitution.
804  [main] INFO  o.a.s.f.FluxBuilder - Detected DSL topology...
org.apache.logging.slf4j.Log4jLogger@3b69e7d1
1006 [main] DEBUG o.a.s.f.FluxBuilder - Found constructor arguments in definition: java.util.ArrayList
1006 [main] DEBUG o.a.s.f.FluxBuilder - Checking arguments for references.
1010 [main] DEBUG o.a.s.f.FluxBuilder - Target class: org.elasticsearch.storm.EsBolt
1011 [main] DEBUG o.a.s.f.FluxBuilder - found constructor with same number of args..
1011 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class class java.lang.String to object class class java.lang.String to see if assignment is possible.
1011 [main] DEBUG o.a.s.f.FluxBuilder - Yes, they are the same class.
1012 [main] DEBUG o.a.s.f.FluxBuilder - ** invokable --> true
1012 [main] DEBUG o.a.s.f.FluxBuilder - found constructor with same number of args..
1012 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class class java.lang.String to object class class java.lang.String to see if assignment is possible.
1012 [main] DEBUG o.a.s.f.FluxBuilder - Yes, they are the same class.
1012 [main] DEBUG o.a.s.f.FluxBuilder - ** invokable --> true
1012 [main] DEBUG o.a.s.f.FluxBuilder - Skipping constructor with wrong number of arguments.
1012 [main] WARN  o.a.s.f.FluxBuilder - Found multiple invokable constructors for class class org.elasticsearch.storm.EsBolt, given arguments [myindex/docs, {es.nodes=localhost}]. Using the last one found.
1014 [main] DEBUG o.a.s.f.FluxBuilder - Found something seemingly compatible, attempting invocation...
1044 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class class java.lang.String to object class class java.lang.String to see if assignment is possible.
1044 [main] DEBUG o.a.s.f.FluxBuilder - They are the same class.
1044 [main] DEBUG o.a.s.f.FluxBuilder - Comparing parameter class boolean to object class class java.util.HashMap to see if assignment is possible.
Exception in thread "main" java.lang.IllegalArgumentException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:291)
...
2

There are 2 answers

0
Guido On

As a crappy workaround, I've finally extended EsBolt to expose only the constructors I need and avoid collisions.

package com.mypackage;

import java.util.Map;
import org.elasticsearch.storm.EsBolt;

public class EsBoltWrapper extends EsBolt {

    public EsBoltWrapper(String target) {
        super(target);
    }

    public EsBoltWrapper(String target, Map configuration) {
        super(target, configuration);
    }
}

Now my topology looks like this:

bolts:
  - id: "es-bolt"
    className: "com.mypackage.EsBoltWrapper" # THE NEW CLASS
    constructorArgs:
      - "myindex/docs"
      - ref: "esConfig"
    parallelism: 1

It seems to be a bug in Storm Flux.

0
user2250246 On

This issue has been fixed on Nov 18, 2015.

See this: https://github.com/apache/storm/commit/69b9cf581fd977f6c28b3a78a116deddadc44014

And the next version of Storm with this fix should be released within a month.