Flume环境安装

 源码包下载:

http://archive.apache.org/dist/flume/1.8.0/

集群环境:

master 192.168.1.99
slave1 192.168.1.100
slave2 192.168.1.101

下载安装包:

# Master
wget http://archive.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz -C /usr/local/src
tar -zxvf apache-flume-1.8.0-bin.tar.gz
mv apache-flume-1.8.0-bin /usr/local/flume

Flume配置:

#Netcat

cd /usr/local/flume/conf

vim flume-netcat.conf

# Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

# Describe/configuration the source
agent.sources.r1.type = netcat
agent.sources.r1.bind = master
agent.sources.r1.port = 44444

#Describe the sink
agent.sinks.k1.type = logger

# Use a channel which buffers events in memory 
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

验证:

服务端:
/usr/local/flume/bin/flume-ng agent -f flume-netcat.conf -n agent -Dflume.root.logger=INFO, console

客户端:
telnet master 44444

结果如图:

 

#Exec

cd /usr/local/flume/conf

vim flume-exec.conf

# Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

# Describe/configuration the source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /root/test.log

#Describe the sink
agent.sinks.k1.type = logger

# Use a channel which buffers events in memory 
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

验证:

服务端
/usr/local/flume/bin/flume-ng agent -f flume-exec.conf -n agent -Dflume.root.logger=INFO, console

客户端
echo "wangzai" > /root/test.log

结果如图:

 #HDFS

cd /usr/local/flume/conf

vim flume-exec-hdfs.conf

# Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

# Describe/configuration the source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /root/test.log
agent.sources.r1.shell = /bin/bash -c

#Describe the sink
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://master:9000/data/flume/tail
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.writeFormat=Text

## hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒,默认为30s
## 如果设置成0,则表示不根据时间来滚动文件;
# agent.sinks.k1.hdfs.rollInterval = 0
## 表示到134M的时候回滚到下一个文件
#agent.sinks.k1.hdfs.rollSize = 134217728
#agent.sinks.k1.hdfs.rollCount = 1000000
#agent.sinks.k1.hdfs.batchSize=10

# Use a channel which buffers events in memory
agent.channels.c1.type = memory
#agent.channels.c1.capacity = 1000
#agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

验证:

服务端
/usr/local/flume/bin/flume-ng agent -f flume-exec-hdfs.conf -n agent -Dflume.root.logger=INFO, console

客户端
echo "wangzai" > /root/test.log

结果如图:

#Kafka

cd /usr/local/flume/conf

vim flume-exec-kafka.conf

# Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

# Describe/configuration the source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /root/test.log
agent.sources.r1.shell = /bin/bash -c 

## kafka
#Describe the sink
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = kafkatest
agent.sinks.k1.brokerList = master:9092
agent.sinks.k1.requiredAcks = 1
agent.sinks.k1.batchSize = 2

# Use a channel which buffers events in memory 
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
#agent.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

验证:

启动kafka,创建topic

/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /dev/null &
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic kafkatest

启动flume以及测试

服务端
/usr/local/flume/bin/flume-ng agent -f flume-exec-kafka.conf -n agent -Dflume.root.logger=INFO, console

客户端
echo "wangzai" > test.log

启动kafka客户端

/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic kafkatest --from-beginning

结果如图:

flume服务端:

kafka客户端:

原文地址:https://www.cnblogs.com/654wangzai321/p/9692595.html