kafka和flume整合

Kafka作为source

配置文件:

#定义各个模块

a1.sources = kafka

a1.sinks = log

a1.channels = c1

 

#配置kafka source

#source的类型为kafkaSource

a1.sources.kafka.type = org.apache.flume.source.kafka.KafkaSource

#消费者连接的zk集群地址

a1.sources.kafka.zookeeperConnect = chd01:2181,cdh02:2181,chd03:2181

#消费者消费的topic,只能是一个。

a1.sources.kafka.topic = hello

#kafka的组id

a1.sources.kafka.groupId = flume

#kafka的消费者连接超时时间单位毫秒

a1.sources.kafka.kafka.consumer.timeout.ms = 3000

 

 

# 配置logger sink

a1.sinks.log.type = logger

 

# 配置 memory channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 绑定三种组件的关系

a1.sources.kafka.channels = c1

a1.sinks.log.channel = c1

 

 

 

 

Kafka作为source

配置文件:

#定义各个模块

a1.sources = kafka

a1.sinks = log

a1.channels = c1

 

#配置kafka source

#source的类型为kafkaSource

a1.sources.kafka.type = org.apache.flume.source.kafka.KafkaSource

#消费者连接的zk集群地址

a1.sources.kafka.zookeeperConnect = node0:2181,node1:2181,node2:2181

#消费者消费的topic,只能是一个。

a1.sources.kafka.topic = hello

#kafka的组id

a1.sources.kafka.groupId = flume

#kafka的消费者连接超时时间单位毫秒

a1.sources.kafka.kafka.consumer.timeout.ms = 3000

 

 

# 配置logger sink

a1.sinks.log.type = logger

 

# 配置 memory channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 绑定三种组件的关系

a1.sources.kafka.channels = c1

a1.sinks.log.channel = c1

原文地址:https://www.cnblogs.com/zlzhoulei/p/5563635.html