一、任务描述:将本地目录~/testdata/logs.件夹下的所有.本.件通过Kafka Sink写入Kafka中的flume topic(topic名称为:flume-topic)
数据流
~/testdata/logs -> flume -> kafka
二、版本信息:
flume:1.7.0
zookeeper:3.4.5
kafka:2.10-0.10.1.1
节点数:3
三、相关配置
flume agent配置
LogAgent.sources = mysource LogAgent.channels = mychannel LogAgent.sinks = mysink LogAgent.sources.mysource.type = spooldir LogAgent.sources.mysource.channels = mychannel LogAgent.sources.mysource.spoolDir =/home/zkpk/testdata/logs LogAgent.sinks.mysink.channel = mychannel LogAgent.sinks.mysink.type = org.apache.flume.sink.kafka.KafkaSink LogAgent.sinks.mysink.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092 LogAgent.sinks.mysink.kafka.topic=flume-topic LogAgent.sinks.mysink.kafka.flumeBatchSize=20 LogAgent.sinks.mysink.kafka.producer.acks=1 LogAgent.sinks.mysink.kafka.producer.linger.ms=1 LogAgent.channels.mychannel.type = memory LogAgent.channels.mychannel.capacity = 60000 LogAgent.channels.mychannel.transactionCapacity = 100
四,实验过程
1. 启动flume agent
1 bin/flume-ng agent -c conf -f conf/flumedata.properties -n LogAgent -Dflume.root.logger=DEBUG,console
2.启动kafka消费者端
1 bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flume-topic
3.不断地创建,拷贝文件到~/testdata/logs
[zkpk@master pre-logs]$ vi file9 [zkpk@master pre-logs]$ cp file9 ../logs
4.观察kafka的输出
五、实验过程中遇到的一些问题
1.LEADER_NOT_AVAILABLE
这是在启动flume agent过程中出现的,排查了下,是kafka引起的。
解决方法:
停掉kafka,停掉zookeeper,再删除它们的数据目录,然后再重新启动。
2.源文件目录读取错误
Spool Directory source mysource: { spoolDir: /home/zkpk/testdata/logs }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
原因:在实验过程中,误将已经处理过的文件重新导入了源目录
解决方法:
停掉flume agent,删除重复的文件,再次重启。