(10)Kafka与Flume集成

  Kafka生产的数据是由Flume的Sink提供的,通过Flume将Agent的日志收集分发到 Kafka。

  本篇用到了前面几篇随笔的知识,请做参考:

  (1)安装zookeeper集群

  (2)kafka单机多Broker(伪分布式)的基本配置

  (3)java程序连接kafka示例

  (4)Flume安装及其启动 

  1、准备工作

  (1)启动zookeeper集群。分别在192.168.7.151、192.168.7.152、192.168.7.153在执行以下命令:

[root@localhost ~]# zkServer.sh start

  (2)启动192.168.7.151上面的kafka。执行以下命令

[root@localhost kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/server.properties &
[root@localhost kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/server1.properties &

  (3)启动kafka的消费者端,如下,输出的数据是之前的

  2、创建flume连接kafka的配置文件

[root@localhost myagent]# vim /usr/local/apache-flume-1.6.0-bin/myagent/a4.conf

  内容如下:

 1 #bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
 2 
 3 #定义agent名, source、channel、sink的名称
 4 a4.sources = r1
 5 a4.channels = c1
 6 a4.sinks = k1
 7 
 8 #具体定义source
 9 a4.sources.r1.type = spooldir
10 a4.sources.r1.spoolDir = /usr/local/logs/flumelogs
11 
12 #具体定义channel
13 a4.channels.c1.type = memory
14 a4.channels.c1.capacity = 1000
15 a4.channels.c1.transactionCapacity = 100
16 
17 #具体定义sink
18 a4.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
19 a4.sinks.k1.topic = mydemo1
20 a4.sinks.k1.brokerList = 192.168.7.151:9092,192.168.7.151:9093
21 a4.sinks.k1.requiredAcks = 1
22 a4.sinks.k1.batchSize = 20
23 
24 #组装source、channel、sink
25 a4.sources.r1.channels = c1
26 a4.sinks.k1.channel = c1

  3、启动flume

[root@localhost apache-flume-1.6.0-bin]# bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

  4、测试

  新建一个文件,如下图所示:

   移动到 /usr/local/logs/flumelogs下,查看kafka的消费者端,控制台已经打印出信息,如图所示:

   

 

原文地址:https://www.cnblogs.com/javasl/p/12304034.html