大数据学习笔记:Flume导数据至Kafka

一、任务描述:将本地目录~/testdata/logs.件夹下的所有.本.件通过Kafka Sink写入Kafka中的flume topic(topic名称为:flume-topic)

 数据流

~/testdata/logs -> flume -> kafka

二、版本信息:

flume:1.7.0

zookeeper:3.4.5

kafka:2.10-0.10.1.1

节点数:3

三、相关配置

flume agent配置

LogAgent.sources = mysource
LogAgent.channels = mychannel
LogAgent.sinks = mysink

LogAgent.sources.mysource.type = spooldir
LogAgent.sources.mysource.channels = mychannel
LogAgent.sources.mysource.spoolDir =/home/zkpk/testdata/logs

LogAgent.sinks.mysink.channel = mychannel
LogAgent.sinks.mysink.type = org.apache.flume.sink.kafka.KafkaSink
LogAgent.sinks.mysink.kafka.bootstrap.servers=master:9092,slave1:9092,slave2:9092
LogAgent.sinks.mysink.kafka.topic=flume-topic
LogAgent.sinks.mysink.kafka.flumeBatchSize=20
LogAgent.sinks.mysink.kafka.producer.acks=1
LogAgent.sinks.mysink.kafka.producer.linger.ms=1


LogAgent.channels.mychannel.type = memory
LogAgent.channels.mychannel.capacity = 60000
LogAgent.channels.mychannel.transactionCapacity = 100

四,实验过程

1. 启动flume agent

1 bin/flume-ng agent -c conf -f conf/flumedata.properties -n LogAgent -Dflume.root.logger=DEBUG,console

2.启动kafka消费者端

1 bin/kafka-console-consumer.sh --zookeeper master:2181 --topic flume-topic

3.不断地创建,拷贝文件到~/testdata/logs

[zkpk@master pre-logs]$ vi file9 
[zkpk@master pre-logs]$ cp file9 ../logs

4.观察kafka的输出

五、实验过程中遇到的一些问题

1.LEADER_NOT_AVAILABLE

这是在启动flume agent过程中出现的,排查了下,是kafka引起的。

解决方法:

停掉kafka,停掉zookeeper,再删除它们的数据目录,然后再重新启动。

2.源文件目录读取错误

Spool Directory source mysource: { spoolDir: /home/zkpk/testdata/logs }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.

原因:在实验过程中,误将已经处理过的文件重新导入了源目录

解决方法:

停掉flume agent,删除重复的文件,再次重启。

技术服务业务
原文地址:https://www.cnblogs.com/cauwt/p/flume_kafka_test_troubleshooting.html