I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 
I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:  
public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
            System.exit(1);
        }
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(1));
        }
        JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
        System.out.println("Connection done++++++++++++++");
        JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                {
                                                    public String call(Tuple2<String, String> message)
                                                    {
                                                        System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
                                                        return message._2();
                                                    }
                                                }
                                                );
        data.print();
        jssc.start();
        jssc.awaitTermination();
    }
}
I am running the job, and at other terminal I am running kafka-producer to publish messages:
Hi kafka
second message
another message
But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received:
-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer....  and bin/kafka-console-consumer... its working perfect, but why not my code... any idea ?  
 
                        
Issue solved.
the code above is correct. We will just add two more lines to supress the [INFO] and [WARN] generated. So the final code is:
Also we need to add dependency in the POM.xml:
This dependency is used for making use of the
scala.Tuple2The error of
Stream 0 received 0 blockwas due to the spark-worker not available and and the spark-worker-core was set to 1. For spark-streaming we need the core to be >=2. So we need to make changes in the spark-config file. Refer the installation manual. to add the lineexport SPARK_WORKER_CORE=5Also change theSPARK_MASTER='hostname'toSPARK_MASTER=<your local IP>. This local ip is what you see in BOLD when you go to your Spark UI web console...something like:spark://192.168..:<port>. We dont need the port here. only the IP is required.Now restart your spark-master and spark-worker and start streaming :)
output: