阅读排行
- 中国标准城市区域码
- postgresql 截取日期字符串 的函数
- OkHttp 官方中文文档
- 关于java中“使用了未经检查或不安全的操作、有关详细信息,请使用 ——X临潼:unchecked重新编译”
- 什么值得买自动签到自动评论定时任务
- chrome://plugins 无法打开的解决方法,同时解决“该网页已屏蔽插件-adobe flash player”
- 如何在GPT格式下创建EFI和MSR分区(安装WIN10/WIN8.1)
- 联想X270——正版Win10换win7之BIOS配置及系统安装
- 几款开源的堡垒机
- code128条码生成与显示
- Java操作Excel,设置某些单元格不可编辑(HSSFWorkbook ,XSSFWorkbook )
- 第04章-VTK基础(6)
- 通达OA用户操作手册(一)
- postgresql——字符串函数
- (转)C#进行图像处理的几种方法(Bitmap,BitmapData,IntPtr)
- 5.EVE-NG关联SecureCRT,VNC,Wireshark
- Stream 操作 合并两个list
- Unity 利用UGUI打包图集,动态加载sprite资源
- elasticsearch 数据丢失的分析
- win7&win10安装AD管理工具
storm数据流分流和合并
数据流分流和合并
数据流经常需要分流与合并操作,如下图所示:
分流
分流有2钟情况,第一种是,相同的tuple发往下一级不同的bolt, 第二种,分别发送不同的tuple到不同的下级bolt上。
发送相同tuple
其实和普通1v1 发送一模一样,就是有2个或多个bolt接收同一个spout或bolt的数据 举例来说:
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME, new SequenceSpout(), spoutParal); builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping( SequenceTopologyDef.SEQUENCE_SPOUT_NAME); builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1) .shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
发送不同的tuple
当发送不同的tuple到不同的下级bolt时, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A‘时使用stream A, 发送b 消息到接收方B‘时,使用stream B
在topology提交时:
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping( SequenceTopologyDef.SEQUENCE_SPOUT_NAME); builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping( SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字 SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1) .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字 SequenceTopologyDef.CUSTOMER_STREAM_ID); // --- 接收发送方该stream 的tuple
在发送消息时:
public void execute(Tuple tuple, BasicOutputCollector collector) { tpsCounter.count(); Long tupleId = tuple.getLong(0); Object obj = tuple.getValue(1); if (obj instanceof TradeCustomer) { TradeCustomer tradeCustomer = (TradeCustomer)obj; Pair trade = tradeCustomer.getTrade(); Pair customer = tradeCustomer.getCustomer(); collector.emit(SequenceTopologyDef.TRADE_STREAM_ID, new Values(tupleId, trade)); collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Values(tupleId, customer)); }else if (obj != null){ LOG.info("Unknow type " + obj.getClass().getName()); }else { LOG.info("Nullpointer " ); } }
定义输出流格式:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE")); declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER")); } //接受消息时,需要判断数据流 if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) { customer = pair; customerTuple = input; tradeTuple = tradeMap.get(tupleId); if (tradeTuple == null) { customerMap.put(tupleId, input); return; } trade = (Pair) tradeTuple.getValue(1); }
数据流合并
生成topology时
在下面例子中, MergeRecord 同时接收SequenceTopologyDef.TRADE_BOLT_NAME 和SequenceTopologyDef.CUSTOMER_BOLT_NAME 的数据
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping( SequenceTopologyDef.SPLIT_BOLT_NAME, SequenceTopologyDef.TRADE_STREAM_ID); builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1) .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, SequenceTopologyDef.CUSTOMER_STREAM_ID); builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1) .shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME) .shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);
发送方
发送的bolt和普通一样,无需特殊处理
接收方
接收方是,区分一下来源component即可识别出数据的来源
if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) { customer = pair; customerTuple = input; tradeTuple = tradeMap.get(tupleId); if (tradeTuple == null) { customerMap.put(tupleId, input); return; } trade = (Pair) tradeTuple.getValue(1); } else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) { trade = pair; tradeTuple = input; customerTuple = customerMap.get(tupleId); if (customerTuple == null) { tradeMap.put(tupleId, input); return; } customer = (Pair) customerTuple.getValue(1); }
文章来自:http://www.cnblogs.com/muzhongjiang/p/4117625.html