Storm-Kafka模块之写入kafka-KafkaBolt的使用及实现
storm在0.9.3版本中提供了一个抽象通用的Bolt KafkaBolt用来实现数据写入kafka的目的,我们先来看一个具体的例子,然后再看如何实现的。
我们用代码加注释的方式,来看下如何使用
//1. KafkaBolt的前置组件emit出来的(可以是spout也可以是bolt) Spout spout = new Spout(new Fields("key", "message")); builder.setSpout("spout", spout); //2. 给KafkaBolt配置topic及前置tuple消息到kafka的mapping关系 KafkaBolt bolt = new KafkaBolt(); bolt.withTopicSelector(new DefaultTopicSelector("tony-S2K")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout"); Config conf = new Config(); //3. 设置kafka producer的配置 Properties props = new Properties(); props.put("metadata.broker.list", "10.100.90.203:9092"); props.put("producer.type","async"); props.put("request.required.acks", "0"); // 0 ,-1 ,1 props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); conf.put("topic","tony-S2K"); if(args.length > 0){ // cluster submit. try { StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else{ new LocalCluster().submitTopology("kafkaboltTest", conf, builder.createTopology()); }
完整的代码参考github:
文章来自:http://blog.csdn.net/tonylee0329/article/details/43149525