Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现
storm在0.9.3版本中提供了一个抽象通用的Bolt KafkaBolt用来实现数据写入kafka的目的,我们先来看一个具体的例子,然后再看如何实现的。
我们用代码加注释的方式,来看下如何使用
//1. KafkaBolt的前置组件emit出来的(可以是spout也可以是bolt)
Spout spout = new Spout(new Fields("key", "message"));
builder.setSpout("spout", spout);
//2. 给KafkaBolt配置topic及前置tuple消息到kafka的mapping关系
KafkaBolt bolt = new KafkaBolt();
bolt.withTopicSelector(new DefaultTopicSelector("tony-S2K"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
Config conf = new Config();
//3. 设置kafka producer的配置
Properties props = new Properties();
props.put("metadata.broker.list", "10.100.90.203:9092");
props.put("producer.type","async");
props.put("request.required.acks", "0"); // 0 ,-1 ,1
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
conf.put("topic","tony-S2K");
if(args.length > 0){
// cluster submit.
try {
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}else{
new LocalCluster().submitTopology("kafkaboltTest", conf, builder.createTopology());
}
完整的代码参考github:
文章来自:http://blog.csdn.net/tonylee0329/article/details/43149525