Now I want to use Drools in a blot,it works normal in the LocalCluster, but when I put it to the production cluster,it has error. The blot is:
public class DealLostBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");
private OutputCollector collector;
private KieSession kieSession;
private FactHandle factHandle;
@Override
public void execute(Tuple input) {
// 获取数据
String sentence = (String) input.getValue(0);
LOGGER.info("DealLostBolt获取到的数据:" + sentence);
// 数据转换
PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
KieServices ks = KieServices.Factory.get();
KieContainer kieContainer = ks.getKieClasspathContainer();
kieSession = kieContainer.newKieSession("all-rule");
kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
factHandle = kieSession.insert(dataPoint);
kieSession.fireAllRules();
kieSession.delete(factHandle);
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("value"));
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
}
I used official documents to create the kiesession. The erros is:
java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.NullPointerException
at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
... 6 more
Perhapse something do not initialization. But I create a new kieservice when blot execute. Could somebody help me?
Thanks!
I had a similar problem using Drools together with JMH as shaded jar. Drools uses a ServiceRegistry approach. This means the Drools libraries (drools-compiler, kie-ci, drools-decisiontables, ...) contain a same named property file, that indicates which implementation for an interface they offer.
The shaded jar plugin usually flattens the (transitive) dependencies into one jar. For files that exists multiple times, this usually means that one of them is selected if not otherwise specified. For ServiceRegistry properties, we need to combine all files. Usually this is done via the ServicesResourceTransformer. This Transformer handles files in
META-INF/services/
, but the relevant file for Drools isMETA-INF/kie.conf
. My problem with JMH could be solved with the AppendingTransformer:I'm not an expert of Storm, but the Starter suggests that it also makes use of the shade plugin. I assume you run your local cluster from the IDE - which does not use the shaded jar.