Storm 基础 -- spout与bolt设置多重grouping
Topology的代码如下:
TopologyBuilder builder = new TopologyBuilder();
//WordReaderSpout会从文件中读取数据,数据用shuffle的方式发送给bolt进行处理
//当文件读取完成后,会发送一个global消息
builder.setSpout("word-reader",new WordReaderSpout());
builder.setBolt("word-normalizer", new WordNormalizerBolt())
        .shuffleGrouping("word-reader")
        .globalGrouping("word-reader", "FINISH");
builder.setBolt("word-counter", new WordCounterBolt(),1)
     .fieldsGrouping("word-normalizer", new Fields("word"))
     .globalGrouping("word-normalizer", "CLOSE");
以globalGrouping为例: 
      globalGrouping(“word-reader”, “FINISH”); 两个参数的含义 
    第一个参数:   “word-reader” 为componet id, 这个值 与我们代码的   中的word-reader一致。
builder.setSpout("word-reader",new WordReaderSpout());
第二个参数: “FINISH”为stream id 
这个值是在WordReaderSpout中定义的
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("line"));
    declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
}
下面以我们熟知的HashMap为例,解释这两个参数 
1) 我们需要定义一个word-reader 的HashMap对象
Map<String, Object> word-reader = new HashMap<String, Object>();
2) 我们调用put两个对象进去,这两个对象对应的Key值分别为default与FINISH
word-reader.put("default", new Object());
word-reader.put("FINISH", new Object());
3) 有其它的对象需要获取我们put进去的两个对象,首先就需要先获取word-reader的Handler。
WordNormalizerBolt  word-normalizer = new WordNormalizerBolt();
word-normalizer.setMap(word-reader);
对应到Storm的代码中,就如下: 
1) 首先我们需要创建一个word-reader的对象,  —这是bolt代码
builder.setSpout("word-reader",new WordReaderSpout());
2) 我们需要put两个对象,这里与Map可能有点儿差别,因为需要先声明。 — 这是spout的代码
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line")); //declare()相当于declarer.declareStream(“defalut”, new Fields("line"));
        declarer.declareStream("FINISH", new Fields("FINISH Reade file"));
    }
而put对象则是在nextTuple()中进行的
    public void nextTuple() {
                 ... ... 
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                               //put一个key为"default"的对象
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
                      ... ...
        }
         //put一个key为"FINISH"的对象
        this.collector.emit("FINISH", new Values("Finish"));
    }
3) bolt获取Map对象 —- 在TopologyBuilder代码中实现
builder.setBolt("word-normalizer", new WordNormalizerBolt())
        .shuffleGrouping("word-reader")
        .globalGrouping("word-reader", "FINISH");
文章来自:http://blog.csdn.net/eyoulc123/article/details/51353200