how to use drools in storm topology

481 views Asked by At

Now I want to use Drools in a blot,it works normal in the LocalCluster, but when I put it to the production cluster,it has error. The blot is:

public class DealLostBolt extends BaseRichBolt {

      private static final long serialVersionUID = 1L;

      private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");

      private OutputCollector collector;

      private KieSession kieSession;

      private FactHandle factHandle;

      @Override
      public void execute(Tuple input) {
        // 获取数据
        String sentence = (String) input.getValue(0);
        LOGGER.info("DealLostBolt获取到的数据:" + sentence);

        // 数据转换
        PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

        KieServices ks = KieServices.Factory.get();
        KieContainer kieContainer = ks.getKieClasspathContainer();
        kieSession = kieContainer.newKieSession("all-rule");
        kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();

        factHandle = kieSession.insert(dataPoint);
        kieSession.fireAllRules();
        kieSession.delete(factHandle);

        collector.emit(new Values(sentence));
      }

      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("value"));

      }

      @Override
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
      }

    }

I used official documents to create the kiesession. The erros is:

java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
    Caused by: java.lang.NullPointerException
    at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
    ... 6 more

Perhapse something do not initialization. But I create a new kieservice when blot execute. Could somebody help me?

Thanks!

1

There are 1 answers

1
Dennis Rippinger On BEST ANSWER

I had a similar problem using Drools together with JMH as shaded jar. Drools uses a ServiceRegistry approach. This means the Drools libraries (drools-compiler, kie-ci, drools-decisiontables, ...) contain a same named property file, that indicates which implementation for an interface they offer.

The shaded jar plugin usually flattens the (transitive) dependencies into one jar. For files that exists multiple times, this usually means that one of them is selected if not otherwise specified. For ServiceRegistry properties, we need to combine all files. Usually this is done via the ServicesResourceTransformer. This Transformer handles files in META-INF/services/, but the relevant file for Drools is META-INF/kie.conf. My problem with JMH could be solved with the AppendingTransformer:

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    <resource>META-INF/kie.conf</resource>
</transformer>

I'm not an expert of Storm, but the Starter suggests that it also makes use of the shade plugin. I assume you run your local cluster from the IDE - which does not use the shaded jar.