DRPC Server error in storm

1.2k views Asked by At

I am trying to execute the below code and getting an error .. Not sure if i am missing something here.. Also where would i see the output?

Error

java.lang.RuntimeException: No DRPC servers configured for topology at backtype.storm.drpc.DRPCSpout.open(DRPCSpout.java:79) at storm.trident.spout.RichSpoutBatchTriggerer.open(RichSpoutBatchTriggerer.java:58) at backtype.storm.daemon.executor$fn__5802$fn__5817.invoke(executor.clj:519) at backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:744)

Code:
----
package com.**.trident.storm;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.*;
import storm.trident.*;

import backtype.storm.*;

public class EventTridentDrpcTopology
{
private static final String KAFKA_SPOUT_ID = "kafkaSpout";  

private static final Logger log = LoggerFactory.getLogger(EventTridentDrpcTopology.class);

public static StormTopology buildTopology(OpaqueTridentKafkaSpout spout) throws Exception
{
    TridentTopology tridentTopology = new TridentTopology();
    TridentState ts = tridentTopology.newStream("event_spout",spout)
    .name(KAFKA_SPOUT_ID)
    .each(new Fields("mac_address"), new SplitMac(), new Fields("mac"))
    .groupBy(new Fields("mac"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("maccount"))
    .parallelismHint(4)
    ;

    tridentTopology
    .newDRPCStream("mac_count")
    .each(new Fields("args"), new SplitMac(), new Fields("mac"))
    .stateQuery(ts,new Fields("mac"),new MapGet(), new Fields("maccount"))
    .each(new Fields("maccount"), new FilterNull())
    .aggregate(new Fields("maccount"), new Sum(), new Fields("sum"))
     ;

return tridentTopology.build();

}

public static void main(String[] str) throws Exception
{
    Config conf = new Config();
    BrokerHosts hosts = new ZkHosts("xxxx:2181,xxxx:2181,xxxx:2181");
    String topic = "event";
    //String zkRoot = topologyConfig.getProperty("kafka.zkRoot");
    String consumerGroupId = "StormSpout";

    DRPCClient drpc = new DRPCClient("xxxx",3772);


    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts, topic, consumerGroupId);
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new XScheme()); 
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);


    StormSubmitter.submitTopology("event_trident", conf, buildTopology(opaqueTridentKafkaSpout));

}

}
1

There are 1 answers

1
Branky On

You have to configure the locations of the DRPC servers and launch them. See Remote mode DRPC on http://storm.apache.org/releases/0.10.0/Distributed-RPC.html

Launch DRPC server(s) Configure the locations of the DRPC servers Submit DRPC topologies to Storm cluster Launching a DRPC server can be done with the storm script and is just like launching Nimbus or the UI:

bin/storm drpc

Next, you need to configure your Storm cluster to know the locations of the DRPC server(s). This is how DRPCSpout knows from where to read function invocations. This can be done through the storm.yaml file or the topology configurations. Configuring this through the storm.yaml looks something like this:

drpc.servers: - "drpc1.foo.com" - "drpc2.foo.com"