官术网_书友最值得收藏!

Executing the topology from Command Prompt

Once the UI is visible and all the daemons are started, the topology can be submitted on Nimbus using the following command:

storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost

The Storm UI with the WordCount topology running in distributed mode is shown here. It depicts the topology state, uptime, and other details (we shall discuss the features of the UI in detail in a later chapter). We can kill the topology from the UI.

Executing the topology from Command Prompt

Tweaking the WordCount topology to customize it

Now that we have deployed the WordCount topology in distributed mode, let's tweak the code in the bolts a bit to write WordCount onto a file. To achieve this, we will proceed with the following steps:

  1. We intend to create a new bolt, FileWriterBolt, to achieve this. Open WordCountTopology.java and add the following snippet to WordCountTopology.java:
    public static class FileWriterBolt extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String,  Integer>();
        @Override
        public void execute(Tuple tuple, BasicOutputCollector  collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if(count==null) {count = 0;
            count = 0;
        }
            count++;
            counts.put(word, count);
            OutputStream ostream;
            try {
                ostream = new  FileOutputStream("~/wordCount.txt", true);
                ostream.write(word.getBytes());
                ostream.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            collector.emit(new Values(word, count));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer  declarer) {
            declarer.declare(new Fields("word", "count"));
        }
  2. Next we have to make changes to the main() method to use this new bolt instead of WordCount Bolt(); here is the snippet:
    // instantiates the new builder object 
    TopologyBuilder builder = new TopologyBuilder();
    // Adds a new spout of type "RandomSentenceSpout" with a  parallelism hint of 5 
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Adds a new bolt to the  topology of type "SplitSentence"  with parallelism of 8
    builder.setBolt("split", new SplitSentence(),  8).shuffleGrouping("spout");
    //Adds a new bolt to the  topology of type "SplitSentence"  with parallelism of 8
    //builder.setBolt("count", new FileWriterBolt()(),  12).fieldsGrouping("split", new Fields("word"));
  3. Next, you can execute the topology using Eclipse, run it as Java, and the output will be saved into a file called wordCount.txt in your home directory.
  4. To run in distributed mode, use the following steps:
    1. Compile the topology changes to generate a new Storm-starter project using the following command line:
      mvn clean install
      
    2. Copy storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar from the target folder under the starter project to Nimbus, let's say, at /home/admin/topology/.
    3. Submit the topology using the following command:
      storm jar /home/admin/topology/storm-starter-0.0.1-SNAPSHOT- jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost
      
  5. The output will be the same as the WordCount topology executed in the figure in the preceding section.
主站蜘蛛池模板: 弥勒县| 汾阳市| 邯郸县| 汤阴县| 宁强县| 普兰店市| 新化县| 沅陵县| 理塘县| 贵定县| 麻栗坡县| 民勤县| 阿合奇县| 盘山县| 子洲县| 樟树市| 昔阳县| 五家渠市| 克什克腾旗| 西乌珠穆沁旗| 莆田市| 磐石市| 白朗县| 宝丰县| 循化| 商水县| 莱州市| 比如县| 深水埗区| 丰县| 商南县| 鄂尔多斯市| 蒙自县| 庄河市| 翁牛特旗| 竹溪县| 当雄县| 稻城县| 西贡区| 大石桥市| 海伦市|