I have written a simple storm topology where spout reads data in json format. My first bolt parses the json and sends a particular value to the word counter bolt which writes the data into Cassandra.
Please suggest me the changes.Thanks in Advance !!
Main Topology:
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
//builder.setBolt("word-counter", new WordCounter(),1)
//.fieldsGrouping("word-normalizer", new Fields("sentence1"));
builder.setBolt("word-counter1", new WordCounter(),1)
.shuffleGrouping("word-normalizer", "stream1");
/*builder.setBolt("word-counter2", new WordCounter1(),1)
.shuffleGrouping("word-normalizer", "stream2");*/
//Configuration
Config conf = new Config();
conf.put("wordsFile", "/home/raremile/git/infer-search-realtime/InferlyticsConsumerStorm/src/main/resources/log_tenlaz_10Jun.json");
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(10000);
//cluster.shutdown();
}
}
WordNormalizerBolt:
public class WordNormalizer extends BaseBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
public void cleanup() {}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getString(0);
String sentence1 = null;
// String sentence2 = null;
JSONObject obj = null;
try {
obj = new JSONObject(sentence);
} catch (JSONException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
/*try {
sentence2 = obj.getJSONObject("envelope").getString("sessionId");
} catch (JSONException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}*/
try {
sentence1 = obj.getJSONObject("body").getString("searchTerm");
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sentence1 = sentence1.trim();
// sentence2 = sentence2.trim();
if(!sentence1.isEmpty()){
sentence1 = sentence1.toLowerCase();
//collector.emit(new Values(sentence1));
collector.emit("stream1", new Values(sentence1));
System.out.println("In Normalizer : "+sentence1);
}
/* if(!sentence2.isEmpty()){
// sentence1 = sentence1.toLowerCase();
//collector.emit(new Values(sentence1));
collector.emit("stream2", new Values(sentence2));
System.out.println("In Normalizer : "+sentence2);
}*/
}
/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//declarer.declare(new Fields("sentence1"));
declarer.declareStream("stream1", new Fields("sentence1"));
//declarer.declareStream("stream2", new Fields("sentence2"));
}
}
Word CounterBolt:
public class WordCounter extends BaseBasicBolt {
Integer id;
String name;
Map<String, Integer> counters;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
Cluster cluster = Cluster.builder().addContactPoint("192.168.1.229").build();
Session session = cluster.connect("query");
String query = "CREATE TABLE queryNewUpdateCounterTerm(item text PRIMARY KEY, "
+ "count counter );";
session.execute(query);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
Cluster cluster = Cluster.builder().addContactPoint("192.168.1.229").build();
Session session = cluster.connect("query");
String update = "UPDATE queryNewUpdateCounterTerm SET count = count + 1 " +
"where item = \'"+str+"\';";
session.execute(update);
/*try{
Mongo mongo = new Mongo("localhost", 27017);
DB db = mongo.getDB("storm");
final DBCollection table = db.getCollection("words");
BasicDBObject dbObject = new BasicDBObject();
if(!counters.containsKey(str)){
counters.put(str, 1);
dbObject.put("searchItem",str);
dbObject.put("count",1);
System.out.println(dbObject);
table.insert(dbObject);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
BasicDBObject query = new BasicDBObject();
query.put("searchItem", str);
BasicDBObject documentDetail = new BasicDBObject();
documentDetail.put("count", c);
BasicDBObject updateObj = new BasicDBObject();
updateObj.put("$set", documentDetail);
System.out.println(updateObj);
table.update(query, updateObj);
}
}catch (Exception e) {
e.printStackTrace(); }
System.out.println("In Counter : ");*/
System.out.println("In Counter : ");
}
}
error
ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 0:-1 no viable alternative at input '<EOF>'
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 0:-1 no viable alternative at input '<EOF>'
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) ~[cassandra-driver-core-2.1.5.jar:na]
at bolts.WordCounter.execute(WordCounter.java:69) ~[classes/:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 0:-1 no viable alternative at input '<EOF>'
at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) ~[cassandra-driver-core-2.1.5.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.9.0.Final.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ~[na:1.7.0_79]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ~[na:1.7.0_79]
... 1 common frames omitted
16262 [Thread-9-word-counter1] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 0:-1 no viable alternative at input '<EOF>'
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 0:-1 no viable alternative at input '<EOF>'
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) ~[cassandra-driver-core-2.1.5.jar:na]
at bolts.WordCounter.execute(WordCounter.java:69) ~[classes/:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ~[storm-core-0.9.4.jar:0.9.4]
... 6 common frames omitted
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 0:-1 no viable alternative at input '<EOF>'
at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) ~[cassandra-driver-core-2.1.5.jar:na]
at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) ~[cassandra-driver-core-2.1.5.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) ~[netty-3.9.0.Final.jar:na]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) ~[netty-3.9.0.Final.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ~[na:1.7.0_79]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ~[na:1.7.0_79]
... 1 common frames omitted
16334 [Thread-9-word-counter1] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]