大数据之Flume


什么是Flume

ApacheFlume是一个分布式的、可靠的、可用的系统,用于高效地收集、聚合和将大量来自不同来源的日志数据移动到一个集中的数据存储区。

系统要求

1. JDK 1.8 或以上版本

2. 内存、磁盘 空间充足

3. 代理使用的目录有读写权限

数据流动模型

数据源Source支持多种数据类型,采集到数据后经过Channel通道临时存储,包括 基于内存,Kafka,文件磁盘,然后通过Sink将数据进行落地存储;

Flume Source

主要支持以下几种类型

1. Kafka Source

可以消费kaka中topic中的消息,如果有多个kafka有多个源在运行,可以配置在以消费组的形式读取每一组分区中的topic信息;当前支持的kafak版本为

0.10.1.0或更高版本,配置参考:

tier1.sources.source1.type= = org.apache.flume.source.kafka.KafkaSource

tier1.sources.source1.channels = channel1

tier1.sources.source1.batchSize = 5000

tier1.sources.source1.batchDurationMillis = 2000

tier1.sources.source1.kafka.bootstrap.servers = localhost:9092

tier1.sources.source1.kafka.topics = test1, test2

tier1.sources.source1.kafka.consumer.group.id = custom.g.id

2. Avro Source

监听来自于Avro端口的事件流,比如另一个Flume作为数据源,配置参考:

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

3. Exec Source

执行Unix上命令 生产数据做为Flume的数据源,配置参考:

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /var/log/secure

a1.sources.r1.channels = c1

Flume Sinks

主要支持如下类型

SinkS

HDFS

Hive

Kafka

Avor

Flume Channels

主要支持如下类型

Channel

Memory Channel

JDBC Channel

Kafka Channel

File Channel

下载安装

安装非常简单:

Wget

http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

解压缩

Tar xvf apache-flume-1.9.0-bin.tar.gz  -c  /usr/flume

编写简单实例

需求说明:

模拟将服务器A上的磁盘日志复制采集到另一台服务器B的磁盘上

从需求上分析可知source 为 exec 类型 , channel 基于内存即可,sinks 为file_roll 类型.

操作步骤:

1. 在conf 里面新增配置文件 demo.conf

2. Vim demo.conf , 输入以下配置内容:

// 数据源配置

a1.sources = r1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /usr/website/logs/1.txt

a1.sources.r1.channels = c1

// 数据通道配置

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

//数据目标存储配置

a1.sinks = k1

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /usr/website/sinklogs

3. 启动flume 服务

bin/flume-ng agent --f conf/demo.conf --name a1 -Dflume.root.logger=INFO,console

4. 启动成功后在source 的目录下新建日志文件 1.txt ,然后输入字符串保存

5. 查看sinks 的磁盘目录 /usr/website/sinklogs 是否有生成的txt 日志文件,如果有说明数据已同步成功,同步的策略是source文件内容每变动一次都会全量的同步到sinks上.

当然,sinks 的类型也可以是kafka 消费者.

扫码或长按关注查看更多文章

原文地址:https://www.cnblogs.com/wangzhiyong/p/10499687.html