【Flume】flume输出sink到hbase的实现
flume 1.5.2
hbase 0.98.9
hadoop 2.6
zk 3.4.6
以上是基础的软件及对应版本,请先确认以上软件安装成功!
1、添加jar包支持
将hbase的lib下的这些jar包拷贝到flume的lib下
2、配置flume
注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer,
当然还有一个SimpleHbaseEventSerializer
如果你使用了SimpleHbaseEventSerializer
就会出现如下的错误
[2015-03-04 09:35:41,244] [SinkRunner-PollingRunner-DefaultSinkProcessor:5672] [ERROR] Failed to commit transaction.Transaction rolled back. java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment; at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408) at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391) at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427) at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391) at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) [2015-03-04 09:35:41,249] [SinkRunner-PollingRunner-DefaultSinkProcessor:5677] [ERROR] Failed to commit transaction.Transaction rolled back. java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment; at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408) at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391) at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427) at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391) at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment; at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408) at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391) at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427) at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391) at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)所以不要使用该序列化器
当然这个序列化器应该自己来开发的,因为列族,列修饰符什么的,应该由我们自己定义的
3、测试
这里注意,你必须提前在hbase将表创建好
hbase(main):004:0> create 'flume','chiwei' 0 row(s) in 1.0770 seconds => Hbase::Table - flume
启动flume
待监测的文件内容为
Hello Flume Hello HBase Hello Hadoop Hello Zk Hello chiwei再来看一下hbase的表数据
hbase(main):006:0> scan 'flume' ROW COLUMN+CELL 1425437884264-rd3B0VE5Uz-0 column=chiwei:payload, timestamp=1425437884520, value=Hello Flume 1425437884275-rd3B0VE5Uz-1 column=chiwei:payload, timestamp=1425437884520, value=Hello HBase 1425437884276-rd3B0VE5Uz-2 column=chiwei:payload, timestamp=1425437884520, value=Hello Hadoop 1425437884277-rd3B0VE5Uz-3 column=chiwei:payload, timestamp=1425437884520, value=Hello Zk 1425437884278-rd3B0VE5Uz-4 column=chiwei:payload, timestamp=1425437884520, value=Hello chiwei 1425437884624-rd3B0VE5Uz-5 column=chiwei:payload, timestamp=1425437884639, value=Hello Flume 1425437884626-rd3B0VE5Uz-6 column=chiwei:payload, timestamp=1425437884639, value=Hello HBase 1425437884628-rd3B0VE5Uz-7 column=chiwei:payload, timestamp=1425437884639, value=Hello Hadoop 1425437884629-rd3B0VE5Uz-8 column=chiwei:payload, timestamp=1425437884639, value=Hello Zk 1425437884630-rd3B0VE5Uz-9 column=chiwei:payload, timestamp=1425437884639, value=Hello chiwei 10 row(s) in 0.4020 seconds hbase(main):007:0>测试成功!
文章来自:http://blog.csdn.net/simonchi/article/details/44057163