Little background: I am working on a topology using Apache Storm, I thought why not use dependency injection in it, but I was not sure how it will behave on cluster environment when topology deployed to cluster. I started looking for answers on if DI is good option to use in Storm topologies, I came across some threads about Apache Spark where it was mentioned serialization is going to be problem and saw some responses for apache storm along the same lines. So finally I decided to write a sample topology with google guice to see what happens.
I wrote a sample topology with two bolts, and used google guice to injects dependencies. First bolt emits a tick tuple, then first bolt creates message, bolt prints the message on log and call some classes which does the same. Then this message is emitted to second bolt and same printing logic there as well.
First Bolt
public class FirstBolt extends BaseRichBolt {
private OutputCollector collector;
private static int count = 0;
private FirstInjectClass firstInjectClass;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
Injector injector = Guice.createInjector(new Module());
firstInjectClass = injector.getInstance(FirstInjectClass.class);
}
@Override
public void execute(Tuple tuple) {
count++;
String message = "Message count "+count;
firstInjectClass.printMessage(message);
log.error(message);
collector.emit("TO_SECOND_BOLT", new Values(message));
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("TO_SECOND_BOLT", new Fields("MESSAGE"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
Second Bolt
public class SecondBolt extends BaseRichBolt {
private OutputCollector collector;
private SecondInjectClass secondInjectClass;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
Injector injector = Guice.createInjector(new Module());
secondInjectClass = injector.getInstance(SecondInjectClass.class);
}
@Override
public void execute(Tuple tuple) {
String message = (String) tuple.getValue(0);
secondInjectClass.printMessage(message);
log.error("SecondBolt {}",message);
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
Class in which dependencies are injected
public class FirstInjectClass {
FirstInterface firstInterface;
private final String prepend = "FirstInjectClass";
@Inject
public FirstInjectClass(FirstInterface firstInterface) {
this.firstInterface = firstInterface;
}
public void printMessage(String message){
log.error("{} {}", prepend, message);
firstInterface.printMethod(message);
}
}
Interface used for binding
public interface FirstInterface {
void printMethod(String message);
}
Implementation of interface
public class FirstInterfaceImpl implements FirstInterface{
private final String prepend = "FirstInterfaceImpl";
public void printMethod(String message){
log.error("{} {}", prepend, message);
}
}
Same way another class that receives dependency via DI
public class SecondInjectClass {
SecondInterface secondInterface;
private final String prepend = "SecondInjectClass";
@Inject
public SecondInjectClass(SecondInterface secondInterface) {
this.secondInterface = secondInterface;
}
public void printMessage(String message){
log.error("{} {}", prepend, message);
secondInterface.printMethod(message);
}
}
another interface for binding
public interface SecondInterface {
void printMethod(String message);
}
implementation of second interface
public class SecondInterfaceImpl implements SecondInterface{
private final String prepend = "SecondInterfaceImpl";
public void printMethod(String message){
log.error("{} {}", prepend, message);
}
}
Module Class
public class Module extends AbstractModule {
@Override
protected void configure() {
bind(FirstInterface.class).to(FirstInterfaceImpl.class);
bind(SecondInterface.class).to(SecondInterfaceImpl.class);
}
}
Nothing fancy here, just two bolts and couple of classes for DI. I deployed it on server and it works just fine. The catch/problem though is that I have to initialize Injector
in each bolt which makes me question what is side effect of it going to be?
This implementation is simple, just 2 bolts.. what if I have more bolts? what impact it would create on topology if I have to initialize Injector in all bolts?
If I try to initialize Injector outside prepare method I get error for serialization.