流量分析系统---flume(测试flume+kafka)

1、在flume官方网站下载最新的flume
 
2、解决flume安装包
cd /export/software/
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-flume-1.6.0-bin flume
 
3、创建flume配置文件
cd /export/servers/flume/conf/
mkdir myconf
vi exec.conf 
输入以下内容:
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/data/flume_sources/click_log/1.log
a1.sources.r1.channels = c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = myOrder #注意这里的topic
a1.sinks.k1.brokerList = kafka01:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
注:配置完毕,flume环节的工作基本完成。接下来准备目标数据文件。
 
4、准备目标数据的目录
mkdir -p /export/data/flume_sources/click_log
 
5、通过脚本创建目标文件并生产数据
for((i=0;i<=50000;i++));
do echo "message-"+$i >>/export/data/flume_sources/click_log/1.log;
done
注意:脚本名称叫做click_log_out.sh 需要使用root用户赋权。 chmod +x click_log_out.sh
 
6、开始打通所有流程
各个节点启动zookeeper集群
    第一步:启动kafka集群(mini1,mini2,mini3-----kafka1,kafka2,kafka3)
                nohup kafka-server-start.sh /export/servers/kafka/config/server.properties & 
    第二步:创建一个topic并开启consumer   
               kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 4 --topic myOrder
启动kafka consumer窗口(----consumer)
kafka-console-consumer.sh --zookeeper mini1:2181 --from-beginning --topic myOrder
    第三步:执行数据上产的脚本(mini1-----dataSource)
                sh click_log_out.sh
    第四步:启动flume客户端(mini1-----flume
                ./bin/flume-ng agent -n a1 -c conf -f conf/myconf/exec.conf -Dflume.root.logger=INFO,console
第五步:在第三步启动的kafka consumer窗口查看效果
原文地址:https://www.cnblogs.com/SuMeng/p/8228251.html