Flume笔记


title: Flume笔记

记录Flume的基本搭建和配置

  1. flume 安装

  2. 将下载的flume包,解压到/home/xxx目录中

  3. 将软件中的template文件重命名或者拷贝重命名,去掉template,然后flume-env.sh 配置文件,主要是JAVA_HOME变量设置

样例1:监控一个文件,实时采集新增的数据输出到控制台

$ tail -F Agent选型 exec source + memory channel + logger sink
  1. 配置实现
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/briup/log/test.log
# 命令从-c后的字符串读取
a1.sources.r1.shell = /bin/bash -c

#  Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 运行测试

其中参数:-c/conf 指config文件的目录 –f/-conf-file 是本agent的配置文件 –n/name是指定agent的名称

flume-ng agent -c apache-flume-1.9.0-bin/conf/ -f apache-flume-1.9.0-bin/conf/log.flm -n a1 -Dflume.root.logger=INFO,console
-Dflume.root.logger=INFO,console 在控制台输出执行信息

案例2:Spool

Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:

  1. 拷贝到spool目录下的文件不可以再打开编辑。

  2. spool目录下不可包含相应的子目录

  3. 创建agent配置文件

在flume目录下创建配置文件: conf/spool.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir =/home/briup/flume_test
a1.sources.r1.fileHeader = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c ~/apache-flume-1.9.0-bin/conf/ -f ~/apache-flume-1.9.0-bin/conf/spool.flm -n a1 -Dflume.root.logger=INFO,console

样例3:从指定网络端口采集单行数据 输出到控制台

 agent netcat   + memory + logger

netcat source监听一个给定的端口,然后把text文件的每一行转换成一个event。

配置实现:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template -Dflume.root.logger=INFO,console

使用telnet进行测试

telnet localhost 44444

案例4:Syslogtcp

Syslogtcp监听TCP的端口做为数据源

UDP source以整条消息作为一个简单event。TCP source以新一行”n“分割的字符串作为一个新的event

a. 创建agent配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b. 启动flume agent a1

flume-ng agent -c . -f ./syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console

c. 测试产生syslog

 echo "hello briup.com" | nc localhost 5140
 ```

样例5:将A端服务器日志实时采集到B端服务器

技术选型

```bash
exec source + memory channel + avro sink
avro source + memory channel + logger sink

代码实现

A端服务器

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/briup/log/test.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

B端服务器

avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = localhost
avro-memory-logger.sources.avro-source.port = 44444
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel

$ flume-ng agent -c ~/apache-flume-1.9.0-bin/conf/ -f ~/apache-flume-1.9.0-bin/conf/a.flm -n exec-memory-avro -Dflume.root.logger=INFO,console

$ flume-ng agent -c ~/apache-flume-1.9.0-bin/conf/ -f ~/apache-flume-1.9.0-bin/conf/b.flm -n avro-memory-logger -Dflume.root.logger=INFO,console

案例6:Hadoop sink

注意:此操作之前先将hadoop的依赖的jar包(htrace-core-3.0.4.jar, commons-configuration-1.6.jar, commons-configuration-1.6.jar, hadoop-hdfs-2.6.0.jar等,具体可根据相关的异常判断)拷贝到flume的lib目录下

a. 创建agent配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://172.16.0.4:9000/user/zhaojing/syslogtcp-%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b. 启动flume agent a1

flume-ng agent -c . -f . /hdfs_sink.conf -n a1 -Dflume.root.logger=INFO,console

c. 测试产生syslog

echo "hello briupData flume -> hadoop testing one" | nc localhost 5140

d. 在server1上再打开一个窗口,去hadoop上检查文件是否生成

hadoop fs -ls /user/zhaojing/syslogtcp

案例7:JSONHandler

a. 创建agent配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b. 启动flume agent a1

flume-ng agent -c . -f ./josn.flm -n a1 -Dflume.root.logger=INFO,console
一只孜孜不倦的bird
原文地址:https://www.cnblogs.com/fofade/p/11307580.html