大数据篇:Flume

大数据篇:Flume

flume.apache.org

Flume是什么?

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

如果没有Flume

数据的采集发送怎么处理呢?处理到哪里呢?Flume最主要的作用就是实时读取服务器本地磁盘数据,写入Hdfs或Kafka等中间件。

1 基础架构

  • Agent主要由:source、channel、sink三个组件组成.

  • Source:

    • 从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro(Flume对接Flume),Exec(命令行如tail -f),Taildir(目录本地文件),Kafka等。
  • Channel:

    • channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,并且它可以和任意数量的source和sink链接,支持的类型有: JDBC , File System,Memory等。
  • sink:

    • sink将数据channels消费数据(events)并将其传递给目标地,目标地可能是另一个sink,Flume提供多种数据发送的方式,比如Avro,HDFS,Hive,Kafka。
  • Event

    • Flume以事件的形式将数据从源头传送到最终的目的
    • Event是数据传输的基本单元
    • Event由Header和Body两部分组成,Header用来存放该Event的一些属性(K-V结构),Body存放数据(Byte Array结构)。

2 案例演示

2.1 netcat->Memory->Logger

  1. 通过netcat工具向本机44444端口发送数据
  2. Flume监控本机44444端口读取数据
  3. Flume将获取数据打印到控制台
  • 安装netcat工具
yum -y install nc
#监听44444端口(服务端)
nc -lk 44444
#监听44444端口(客户端)
nc localhost 44444
#互相发送数据接收即可
  • 创建Agent配置文件flume-netcat-logger.conf
vim flume-netcat-logger.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# sources相关配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sinks相关配置
a1.sinks.k1.type = logger

# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100

# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
  • 启动flume
#普通写法
flume-ng agent --conf /etc/flume-ng/conf --conf-file flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
#简写
flume-ng agent -c /etc/flume-ng/conf -f flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

2.2 .log本地文件->Memory->Hdfs

  1. 生成本地日志文件
  2. Flume获取本地数据文件
  3. Flume将获取的文件发送到Hdfs
  • 创建Agent配置文件flume-log-hdfs.conf
vim flume-log-hdfs.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# sources相关配置
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/flume-test/logs/a.log

# sinks相关配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/events/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = events-
#文件夹滚动一分钟创建一个新文件夹
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动时间10S 128M 2条 生成新文件
a1.sinks.k1.hdfs.rollInterval = 10	
a1.sinks.k1.hdfs.rollSize = 134210000	
a1.sinks.k1.hdfs.rollCount = 2
#积累多少Event才刷到hdfs
a1.sinks.k1.hdfs.batchSize = 2
#开启时间滚动需要
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#DataStream不会压缩输出文件
a1.sinks.k1.hdfs.fileType = DataStream 

# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100

# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
  • 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-log-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
  • 创建本地文件
mkdir /root/flume-test/logs
echo "1" > /root/flume-test/logs/a.log
echo "2" >> /root/flume-test/logs/a.log
echo "3" >> /root/flume-test/logs/a.log
echo "4" >> /root/flume-test/logs/a.log
#根据上面设置的间隔时间进行效果测试
echo "5" >> /root/flume-test/logs/a.log
echo "6" >> /root/flume-test/logs/a.log
echo "7" >> /root/flume-test/logs/a.log
echo "8" >> /root/flume-test/logs/a.log

2.3 本地文件夹->Memory->Hdfs

  1. 生成本地文件夹及文件数据
  2. Flume获取本地数据文件
  3. Flume将获取的文件发送到Hdfs
  • 创建Agent配置文件flume-file-hdfs.conf
vim flume-file-hdfs.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# sources相关配置
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/flume-test/dirlogs
#忽略文件
a1.sources.r1.ignorePattern = ([^ ]*.txt)

# sinks相关配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://cdh01.cm:8020/flume/dirlogs/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = log-
#文件夹滚动一分钟创建一个新文件夹
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#文件滚动时间10S 128M 2条 生成新文件
a1.sinks.k1.hdfs.rollInterval = 10	
a1.sinks.k1.hdfs.rollSize = 134210000	
a1.sinks.k1.hdfs.rollCount = 2
#积累多少Event才刷到hdfs
a1.sinks.k1.hdfs.batchSize = 2
#开启时间滚动需要
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#DataStream不会压缩输出文件
a1.sinks.k1.hdfs.fileType = DataStream 

# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100

# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
  • 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-file-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
  • 创建本地文件
mkdir /root/flume-test/dirlogs
echo "1" > /root/flume-test/dirlogs/a.log
echo "2" >> /root/flume-test/dirlogs/a.log
echo "3" > /root/flume-test/dirlogs/a.txt
echo "4" >> /root/flume-test/dirlogs/a.txt
#根据上面设置的间隔时间进行效果测试
echo "5" > /root/flume-test/dirlogs/b.log
echo "6" >> /root/flume-test/dirlogs/b.log
#采用cp直接放入一个写好的文件测试效果

不能在监控目录中创建并持续修改文件

上传完成的文件以.COMPLETED结尾

被监控文件夹500毫秒扫描一次文件变动

2.4 本地文件夹->Memory->Logger

监控目录下的实时追加文件

  1. 生成本地文件夹及文件数据
  2. Flume获取本地数据文件
  3. Flume将获取数据打印到控制台
  • 创建Agent配置文件flume-files-logger.conf
vim flume-files-logger.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# sources相关配置
a1.sources.r1.type = TAILDIR
#位置信息
a1.sources.r1.positionFile = /root/flume-test/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /root/flume-test/test1/a.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /root/flume-test/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# sinks相关配置
a1.sinks.k1.type = logger

# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100

# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
  • 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-files-logger.conf -n a1 -Dflume.root.logger=INFO,console
  • 创建本地文件
mkdir /root/flume-test/test1/
mkdir /root/flume-test/test2/
echo "1" > /root/flume-test/test1/a.log
echo "2" >> /root/flume-test/test1/a.log
echo "3" >> /root/flume-test/test1/a.log
#根据上面设置的间隔时间进行效果测试
echo "5" > /root/flume-test/test2/b.log
echo "6" >> /root/flume-test/test2/b.log
echo "7" >> /root/flume-test/test2/b.log
#停止flume,追加数据,在启动测试断点续传效果。

2.5 netcat->Memory->kafka

  1. 生成本地文件夹及文件数据
  2. Flume获取本地数据文件
  3. Flume将获取数据打印到控制台
  • 创建Agent配置文件flume-files-kafka.conf
vim flume-file-kafka.conf
#--->
# 给agent组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# sources相关配置
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sinks相关配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = top-test
a1.sinks.k1.kafka.bootstrap.servers = cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1


# channels相关配置
a1.channels.c1.type = memory
#事件容量
a1.channels.c1.capacity = 1000
#一次传输多少事件
a1.channels.c1.transactionCapacity = 100

# 绑定三个组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#---<
  • 启动flume
flume-ng agent -c /etc/flume-ng/conf -f flume-file-kafka.conf -n a1 -Dflume.root.logger=INFO,console
  • 启动kafka消费者
kafka-console-consumer --topic top-test --bootstrap-server cdh01.cm:9092,cdh02.cm:9092,cdh03.cm:9092  --from-beginning --group g1
  • 使用netcat
nc localhost 44444


原文地址:https://www.cnblogs.com/ttzzyy/p/12638360.html