Flume配置Load balancing Sink Processor

1 官网内容

  

2 找一个图来理解一目了然

3 详细配置

配置文件load_source_case.conf
	配置数据入口 source到channel 配置了两个sink用来做负载均衡

	#配置文件:
	a1.sources= r1
	a1.sinks= k1 k2
	a1.channels= c1

	#负载平衡
	a1.sinkgroups = g1
	a1.sinkgroups.g1.sinks = k1 k2
	a1.sinkgroups.g1.processor.type =load_balance
	a1.sinkgroups.g1.processor.backoff =true
	a1.sinkgroups.g1.processor.selector=round_robin


	#Describe/configure the source
	a1.sources.r1.type= exec
	a1.sources.r1.command= tail -F /tmp/logs/test.log


	#Describe the sink
	a1.sinks.k1.type= avro
	a1.sinks.k1.hostname= 127.0.0.1
	a1.sinks.k1.port= 50001

	a1.sinks.k2.type= avro
	a1.sinks.k2.hostname= 127.0.0.1
	a1.sinks.k2.port= 50002

	# Usea channel which buffers events in memory
	a1.channels.c1.type= memory
	a1.channels.c1.capacity= 1000
	a1.channels.c1.transactionCapacity= 100

	# set channel
	a1.sinks.k1.channel= c1
	a1.sinks.k2.channel= c1
	a1.sources.r1.channels= c1
	      
sink1配置

# Name the components on this agent
	a2.sources = r1
	a2.sinks = k1
	a2.channels = c1
	
	# Describe/configure the source
	a2.sources.r1.type = avro
	a2.sources.r1.channels = c1
	a2.sources.r1.bind = 127.0.0.1
	a2.sources.r1.port = 50001
	
	# Describe the sink
	a2.sinks.k1.type = logger
	a2.sinks.k1.channel = c1
	
	# Use a channel which buffers events inmemory
	a2.channels.c1.type = memory
	a2.channels.c1.capacity = 1000
	a2.channels.c1.transactionCapacity = 100

  sink2配置

	# Name the components on this agent
	a3.sources = r1
	a3.sinks = k1
	a3.channels = c1
	
	# Describe/configure the source
	a3.sources.r1.type = avro
	a3.sources.r1.channels = c1
	a3.sources.r1.bind = 127.0.0.1
	a3.sources.r1.port = 50002
	
	# Describe the sink
	a3.sinks.k1.type = logger
	a3.sinks.k1.channel = c1
	
	# Use a channel which buffers events inmemory
	a3.channels.c1.type = memory
	a3.channels.c1.capacity = 1000
	a3.channels.c1.transactionCapacity = 100
	

  

4启动服务

  

	先启动两个sink
	
	flume-ng agent -c conf -f /mnt/software/flume-1.6.0/flume-conf/loadBalance/sink1.conf -n a2 -Dflume.root.logger=DEBUG,console
	flume-ng agent -c conf -f /mnt/software/flume-1.6.0/flume-conf/loadBalance/sink2.conf -n a3 -Dflume.root.logger=DEBUG,console
	
再启动source	
	
	flume-ng agent -c conf -f /mnt/software/flume-1.6.0/flume-conf/loadBalance/load_source_case.conf -n a1 -Dflume.root.logger=DEBUG,console
	

  

5 查看效果

  

第一次启动走了sink1 
	
	9/02/21 23:20:55 INFO ipc.NettyServer: [id: 0x617271c1, /127.0.0.1:47138 => /127.0.0.1:50001] CONNECTED: /127.0.0.1:47138
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 7A 68 61 6E 67 6A 69 6E                         zhangjin }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 78 78 78 78                                     xxxx }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 79 79 79 79                                     yyyy }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 7A 68 61 6E 67 6A 69 6E                         zhangjin }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 78 78 78 78                                     xxxx }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 79 79 79 79                                     yyyy }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 7A 68 61 6E 67 6A 69 6E                         zhangjin }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 78 78 78 78                                     xxxx }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 79 79 79 79                                     yyyy }
	19/02/21 23:21:01 INFO sink.LoggerSink: Event: { headers:{} body: 5B 7B 20 22 68 65 61 64 65 72 73 22 20 3A 7B 22 [{ "headers" :{" }
	

	19/02/21 23:23:47 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F                                  hello }
	
 第二次追加数据走了sink2 
 	
 		
	
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64                                  world }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 6A 61 76 61                                     java }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 73 63 61 6C 61                                  scala }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 68 61 64 6F 6F 70                               hadoop }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 7A 68 61 6E 67 6A 69 6E                         zhangjin }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 78 78 78 78                                     xxxx }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 79 79 79 79                                     yyyy }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 7A 68 61 6E 67 6A 69 6E                         zhangjin }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 78 78 78 78                                     xxxx }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 79 79 79 79                                     yyyy }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 7A 68 61 6E 67 6A 69 6E                         zhangjin }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 78 78 78 78                                     xxxx }
	19/02/21 23:23:49 INFO sink.LoggerSink: Event: { headers:{} body: 79 79 79 79                                     yyyy }
	
	
数据文件

	  ello
	world
	java
	scala
	hadoop
	zhangjin
	xxxx
	yyyy
	zhangjin
	xxxx
	yyyy
	zhangjin
	xxxx
	yyyy
	[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]
	hello
	world
	java
	scala
	hadoop
	zhangjin
	xxxx
	yyyy
	zhangjin
	xxxx
	yyyy
	zhangjin
	xxxx
	yyyy
	~
	~

  

6 总结,从效果上来看实现了负载,选择的是轮询算法,其他的大家可以多测试一下














原文地址:https://www.cnblogs.com/QuestionsZhang/p/10417839.html