Error While Inserting Data into Cassandra

6.6k views Asked by At

I am learning Apache Kafka-storm-cassandra Integration. I am reading JSON string from the Kafka cluster using Kafka Spout.Then passing it to bolt which parses the JSON and emits the needed value to the 2nd bolt which writes it to Cassandra DB.

But I am getting these errors.

java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '' expecting '''

java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at 
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) 
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at 
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.execute(WordCounter.java:103) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at 
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ... 6 more Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at 
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at 
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more

TableAlreadyExists Error :

com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85)

com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at 
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at 
com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.prepare(WordCounter.java:77) at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43) at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at 
com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at 
com.datastax.driver.core.Responses$Error.asException(Responses.java:105) at 

com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at 
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:70) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:38) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:168) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) ... 21 more

My maintopology:

public class TopologyQueryCounterMain {


static final Logger logger = Logger.getLogger(TopologyQueryCounterMain.class);


private static final String SPOUT_ID = "QueryCounter";


public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

    int numSpoutExecutors = 1;
    logger.debug("This is SpoutConfig");
    KafkaSpout kspout = QueryCounter();
    TopologyBuilder builder = new TopologyBuilder();
    logger.debug("This is Set Spout");
    builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
    logger.debug("This is Set bolt");
    builder.setBolt("word-normalizer", new WordNormalizer())
        .shuffleGrouping(SPOUT_ID);
    builder.setBolt("word-counter", new WordCounter(),1)
        .shuffleGrouping("word-normalizer", "stream1");


    Config conf = new Config();
    LocalCluster cluster = new LocalCluster();
    logger.debug("This is Submit cluster");
    conf.put(Config.NIMBUS_HOST, "192.168.1.229");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
     System.setProperty("storm.jar", "/home/ubuntu/workspace/QueryCounter/target/QueryCounter-0.0.1-SNAPSHOT.jar");
    conf.setNumWorkers(20);
    conf.setMaxSpoutPending(5000);

    if (args != null && args.length > 0) {
        StormSubmitter. submitTopology(args[0], conf, builder.createTopology());
    }

    else
    {   
        cluster.submitTopology("QueryCounter", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("QueryCounter");
        logger.debug("This is ShutDown cluster");
        cluster.shutdown();
    }
}


private static KafkaSpout QueryCounter() {
    String zkHostPort = "localhost:2181";
    String topic = "RandomQuery";

    String zkRoot = "/QueryCounter";
    String zkSpoutId = "QueryCounter-spout";
    ZkHosts zkHosts = new ZkHosts(zkHostPort);

    logger.debug("This is Inside kafka spout cluster");
    SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
    spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
    return kafkaSpout;
  }

}

normalizer Bolt:

public class WordNormalizer extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordNormalizer.class);
public void cleanup() {}

/**
 * The bolt will receive the line from the
 * words file and process it to Normalize this line
 * 
 * The normalize will be put the words in lower case
 * and split the line to get all words in this 
 */
public void execute(Tuple input, BasicOutputCollector collector) {
String feed = input.getString(0);

    String searchTerm = null;
    String pageNo = null;
    boolean sortOrder = true;
    boolean category = true;
    boolean field = true;
    boolean filter = true;
    String pc = null;
    int ProductCount = 0;
    String timestamp = null;

    JSONObject obj = null;
    try {
        obj = new JSONObject(feed);
    } catch (JSONException e1) {
        // TODO Auto-generated catch block
        //e1.printStackTrace();

    }

    try {
           searchTerm = obj.getJSONObject("body").getString("correctedWord");

           pageNo = obj.getJSONObject("body").getString("pageNo");
           sortOrder = obj.getJSONObject("body").isNull("sortOrder");
           category = obj.getJSONObject("body").isNull("category");
           field = obj.getJSONObject("body").isNull("field");
           filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters");
           pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\\d]", "");
           ProductCount = Integer.parseInt(pc);
           timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().replaceAll("[^\\d]", "");
    } catch (JSONException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();

    }

    searchTerm = searchTerm.trim();

    //Condition to eliminate pagination
     if(!searchTerm.isEmpty()){
         if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){
             searchTerm = searchTerm.toLowerCase();

            System.out.println("In Normalizer term : "+searchTerm+","+timestamp+","+ProductCount);
            System.out.println("Entire Json : "+feed);

             collector.emit("stream1", new Values(searchTerm , timestamp , ProductCount ));

            }
     }


    }



/**
 * The bolt will only emit the field "word" 
 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {

    declarer.declareStream("stream1", new Fields("searchTerm" ,"timestamp" ,"ProductCount"));

}
}

CassandraWriter bolt:

public class WordCounter extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordCounter.class);
Integer id;
String name;
Map<String, Integer> counters;
Cluster cluster ;
Session session ;

/**
 * At the end of the spout (when the cluster is shutdown
 * We will show the word counters
 */
@Override
public void cleanup() {

}
 public static Session getSessionWithRetry(Cluster cluster, String keyspace) {
        while (true) {
            try {
                return cluster.connect(keyspace);
            } catch (NoHostAvailableException e) {

                Utils.sleep(1000);
            }
        }

    }
public static Cluster setupCassandraClient() {
    return Cluster.builder().addContactPoint("192.168.1.229").build();
}
/**
 * On create 
 */
@Override
public void prepare(Map stormConf, TopologyContext context) {
    this.counters = new HashMap<String, Integer>();
    this.name = context.getThisComponentId();
    this.id = context.getThisTaskId();
    cluster = setupCassandraClient();
    session = WordCounter.getSessionWithRetry(cluster,"query");

    String query = "CREATE TABLE IF NOT EXISTS ProductCount(uid uuid PRIMARY KEY, "
            + "term text , "
            + "ProductCount varint," 
            +"timestamp text );";

    session.executeAsync(query);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}


@Override
public void execute(Tuple input, BasicOutputCollector collector) {
    String term = input.getString(0);
    String timestamp = input.getString(1);
    int ProductCount = input.getInteger(2);

    System.out.println("In Counter : " +term+","+ProductCount+","+timestamp);
    /**
     * If the word dosn't exist in the map we will create
     * this, if not We will add 1 
     */



String insertIntoTable = "INSERT INTO ProductCount (uid, term, ProductCount, timestamp)"

     + " VALUES("+UUID.randomUUID()+","+"\'"+term+"\'"+","+ProductCount+","+"\'"+timestamp+"\'"+");" ;
    session.executeAsync(insertIntoTable);



}
}

Please suggests me the changes I need to do.

Thanks In Advnace !!

1

There are 1 answers

1
Alex Popescu On BEST ANSWER

The error points to a missing ' in one of your queries. This is usually the problem if you just concatenate strings. To avoid this sort of problems you should use prepared statements or at least Session.execute(query, params) (javadoc)