flume+kafka

这里演示在单机fulume环境下,kafka作为source ,chanel , sink时三种情况

下面的测试都是基于下面的基本的配置文件进行修改的

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# For each one of the sources, the type is defined
#agent.sources.seqGenSrc.type = seq
a1.sources.r1.type = netcat
a1.sources.r1.bind=mini1
a1.sources.r1.port=44444

# The channel can be defined as follows.
#agent.sources.seqGenSrc.channels = memoryChannel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity =100

# Each sink's type must be defined
#agent.sinks.loggerSink.type = logger
a1.sinks.k1.type = logger
#Specify the channel the sink should use
#agent.sinks.loggerSink.channel = memoryChannel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# Each channel's type is defined.
#agent.channels.memoryChannel.type = memory

 
# In this case, it specifies the capacity of the memory channel
#agent.channels.memoryChannel.capacity = 100

kafka作为source时的配置和produce程序

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = c1
a1.sources.r1.batchSize = 5
a1.sources.r1.batchDurationMillis = 2
a1.sources.r1.kafka.bootstrap.servers = mini1:9092
a1.sources.r1.kafka.topics = Operator
a1.sources.r1.kafka.consumer.group.id = custom.g.id
public static void main(String[] args) throws IOException {
        Properties props = new Properties();
              props.load(TestConsumer.class.getClass().getResourceAsStream("/kafkaProduce.properties"));
        Producer<Integer, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i <50; i++)
            producer.send(new ProducerRecord<Integer, String>("Operator", i, getRandomPhoneNum()));
        producer.close();
       // System.out.println(getRandomPhoneNum());
    }

    public static String getRandomPhoneNum(){
        String[] basePrefix=new String[]{"134","139","130","132","133","189"};
        return basePrefix[new Random().nextInt(basePrefix.length)]+ RandomUtils.nextInt(11111111,99999999);
    }

kafka作为channel时 ,topic必须是一个新的topic如果topic中存在数据那么在启动时会报错

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = mini1:9092,mini2:9092,mini3:9092
a1.channels.c1.kafka.topic = flumedat
a1.channels.c1.kafka.consumer.group.id = flume-consumer


 #修改source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/flume/test/logs/flume.dat
a1.sources.r1.channels = c1

按照官网的说明,当kafka作为channel时可以不需要sink或者source

The Kafka channel can be used for multiple scenarios:

  1. With Flume source and sink - it provides a reliable and highly available channel for events
  2. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
  3. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr

kafka作为sink时

a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/hadoop/flume/test/logs/kfksink
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.fileHeader = true

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumesink
a1.sinks.k1.kafka.bootstrap.servers = mini1:9092
a1.sinks.k1.kafka.flumeBatchSize = 2
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#压缩
a1.sinks.ki.kafka.producer.compression.type = snappy

此时打开kafka消费程序

 
        Properties props = new Properties();
        props.load(TestConsumer.class.getClass().getResourceAsStream("/kfkConsumer.properties"));
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("flumesink"));
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.print("Thread : " + Thread.currentThread().getName());
                System.out.printf("  offset = %d, key = %s, value = %s, partition = %d %n", record.offset(), record.key(), record.value(), record.partition());
            }
            consumer.commitSync();
        }

    }

配置文件来源于http://flume.apache.org/FlumeUserGuide.html

原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7279750.html