Flume-ng:从RocketMQ到HDFS同步数据

工作中 有个小模块负责将RocketMQ的数据写入HDFS。本人认为应该有这种轮子,百度后发现了Flume-ng。

Flume的基础知识,官方文档写的很详细:http://flume.apache.org/FlumeUserGuide.html

Flume本身并不支持RocketMQ,好在Github上已经实现:https://github.com/rocketmq/rocketmq-flume

rocketmq-flume项目,同时支持 source和sink,source是从rocketmq读取数据,sink是往rocketmq写数据,我只用到了source

1、flume.conf 


agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1
agent1.sources.source1.type
=com.handu.flume.source.rocketmq.RocketMQSource agent1.sources.source1.namesrvAddr=10.1.234.205:9876 agent1.sources.source1.consumerGroup=FlumeGroup agent1.sources.source1.topic=FlumeTopic agent1.sources.source1.tags=* agent1.sources.source1.messageModel=BROADCASTING agent1.sources.source1.maxNums=32 agent1.sources.source1.channels=channel1 #agent1.sinks.sink1.type=logger #agent1.sinks.sink1.channel=channel1 agent1.sinks.sink1.channel = channel1 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path = hdfs://rti9:9000/flume/ agent1.sinks.sink1.hdfs.writeFormat = Text agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.rollInterval = 0 agent1.sinks.sink1.hdfs.rollSize = 1024 agent1.sinks.sink1.hdfs.rollCount = 0 agent1.sinks.sink1.hdfs.batchSize = 1000 agent1.sinks.sink1.hdfs.txnEventMax = 1000 agent1.sinks.sink1.hdfs.callTimeout = 60000 agent1.sinks.sink1.hdfs.appendTimeout = 60000 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=1001 agent1.channels.channel1.transactionCapacity=1001 agent1.channels.channel1.keep-alive=3

2、参数说明:

hdfs.rollInterval:Number of seconds to wait before rolling current file (0 = never roll based on time interval)

hdfs.rollSize:File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount:Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.batchSize:number of events written to file before it is flushed to HDFS

batchSize一定不能大于transactionCapacity

3、启动 agent:

flume-ng agent -c conf -f conf/flume.conf -n agent1 -Dflume.root.logger=INFO,console

-c conf 选项,否则 flume-env.sh 里配置的环境变量不会被加载生效

4、遇到的问题:

org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel{name: channel1}
  at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
  at com.handu.flume.source.rocketmq.RocketMQSource.process(RocketMQSource.java:106)
  at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
  at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
  at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
  at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
  at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

  ... 3 more

需要修改JVM内存

vi bin/flume-ng
JAVA_OPTS="-Xmx2048m"

原文地址:https://www.cnblogs.com/machong/p/5630373.html