Storm Word Count Topology - Concept issue with number of executions

817 views Asked by At

Good afternoon, I am following the Storm-starter WordCountTopology here. For reference, here are the Java files.

This is the main file:

public class WordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {

public SplitSentence() {
  super("python", "splitsentence.py");
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("word"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
  return null;
}
}

public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
  String word = tuple.getString(0);
  Integer count = counts.get(word);
  if (count == null)
    count = 0;
  count++;
  counts.put(word, count);
  collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new TextFileSpout(), 5);

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

Config conf = new Config();
conf.setDebug(true);

if (args != null && args.length > 0) {
  conf.setNumWorkers(3);

  StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
  conf.setMaxTaskParallelism(3);
  LocalCluster cluster = new LocalCluster();
  cluster.submitTopology("word-count", conf, builder.createTopology());
  Thread.sleep(10000);
  cluster.shutdown();
}
}
}

Instead of reading from a random String[], I would like just one read from one sentence:

public class TextFileSpout extends BaseRichSpout {
    SpoutOutputCollector _collector;
    String sentence = "";
    String line = "";
    String splitBy = ",";
    BufferedReader br = null;

    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        _collector = collector;

    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        sentence = "wordOne wordTwo";
        _collector.emit(new Values(sentence));
        System.out.println(sentence);
    }

    @Override
    public void ack(Object id) {
    }

    @Override
    public void fail(Object id) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

}

This code runs and the output is a lot of threads/emits. The problem is that the program executes repeatedly reads that one sentence 85 times instead of just once. I'm guessing this is because the original code executes multiple times new random sentences.

What is causing NextTuple to be called so many times?

1

There are 1 answers

5
user2720864 On

You should move the file initialize code with in open method , otherwise every single time the nextTuple is called your file handler will be initialized.

EDIT:

inside open method , do something like

    br = new BufferedReader(new FileReader(csvFileToRead));

and then the logic to read file should be inside the nextTuple method

     while ((line = br.readLine()) != null) {
         // your logic
     }