Dependency Injection in Apache Storm topology

119 views Asked by At

Little background: I am working on a topology using Apache Storm, I thought why not use dependency injection in it, but I was not sure how it will behave on cluster environment when topology deployed to cluster. I started looking for answers on if DI is good option to use in Storm topologies, I came across some threads about Apache Spark where it was mentioned serialization is going to be problem and saw some responses for apache storm along the same lines. So finally I decided to write a sample topology with google guice to see what happens.

I wrote a sample topology with two bolts, and used google guice to injects dependencies. First bolt emits a tick tuple, then first bolt creates message, bolt prints the message on log and call some classes which does the same. Then this message is emitted to second bolt and same printing logic there as well.

First Bolt

public class FirstBolt extends BaseRichBolt {
    private OutputCollector collector;
    private static int count = 0;
    private FirstInjectClass firstInjectClass;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        collector = outputCollector;
        Injector injector = Guice.createInjector(new Module());
        firstInjectClass = injector.getInstance(FirstInjectClass.class);
    }

    @Override
    public void execute(Tuple tuple) {
        count++;
        String message = "Message count "+count;
        firstInjectClass.printMessage(message);
        log.error(message);

        collector.emit("TO_SECOND_BOLT", new Values(message));

        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("TO_SECOND_BOLT", new Fields("MESSAGE"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,   10);
        return conf;
    }
}

Second Bolt

public class SecondBolt extends BaseRichBolt {
    private OutputCollector collector;
    private SecondInjectClass secondInjectClass;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        collector = outputCollector;
        Injector injector = Guice.createInjector(new Module());
        secondInjectClass = injector.getInstance(SecondInjectClass.class);
    }

    @Override
    public void execute(Tuple tuple) {
        String message = (String) tuple.getValue(0);
        secondInjectClass.printMessage(message);
        log.error("SecondBolt {}",message);

        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}

Class in which dependencies are injected

public class FirstInjectClass {
    FirstInterface firstInterface;

    private final String prepend = "FirstInjectClass";

    @Inject
    public FirstInjectClass(FirstInterface firstInterface) {
        this.firstInterface = firstInterface;
    }

    public void printMessage(String message){
        log.error("{} {}", prepend, message);
        firstInterface.printMethod(message);
    }
}

Interface used for binding

public interface FirstInterface {
    void printMethod(String message);
}

Implementation of interface

public class FirstInterfaceImpl implements FirstInterface{
    private final String prepend = "FirstInterfaceImpl";
    public void printMethod(String message){
        log.error("{} {}", prepend, message);
    }
}

Same way another class that receives dependency via DI

public class SecondInjectClass {
    SecondInterface secondInterface;
    private final String prepend = "SecondInjectClass";

    @Inject
    public SecondInjectClass(SecondInterface secondInterface) {
        this.secondInterface = secondInterface;
    }

    public void printMessage(String message){
        log.error("{} {}", prepend, message);
        secondInterface.printMethod(message);
    }
}

another interface for binding

public interface SecondInterface {
    void printMethod(String message);
}

implementation of second interface

public class SecondInterfaceImpl implements SecondInterface{
    private final String prepend = "SecondInterfaceImpl";
    public void printMethod(String message){
        log.error("{} {}", prepend, message);
    }
}

Module Class

public class Module extends AbstractModule {
    @Override
    protected void configure() {
        bind(FirstInterface.class).to(FirstInterfaceImpl.class);
        bind(SecondInterface.class).to(SecondInterfaceImpl.class);
    }
}

Nothing fancy here, just two bolts and couple of classes for DI. I deployed it on server and it works just fine. The catch/problem though is that I have to initialize Injector in each bolt which makes me question what is side effect of it going to be?

This implementation is simple, just 2 bolts.. what if I have more bolts? what impact it would create on topology if I have to initialize Injector in all bolts?

If I try to initialize Injector outside prepare method I get error for serialization.

0

There are 0 answers