2.kafka集群搭建,topic+partition消费逻辑梳理

kafka安装机器:

  • ke01 ke02 ke03

conf/service.properties

#选主
broker.id=1
#监听端口
listeners=PLAINTEXT://ke01:9092
#日志地址
log.dirs=/var/kafka_data
#zk连接信息
zookeeper.connect=ke02:2181,ke03:2181,ke04:2181/kafka

环境

  • /etc/profile

scp同步

  • broker.id每台不一样

  • listeners 修改为对应机器ip

启动:

  • kafka-server-start.sh ./server.properties(三台都运行)

现象

1.zk目录中有kafka目录
2.get /kafka/controller 可以看出当前主机是brokerid=1的
{"version":1,"brokerid":1,"timestamp":"1627232194038"}
3.ls /kafka/brokers/ids 查看有哪些brokers
[1, 2, 3]
4.ls /kafka/brokers/topics 查看有哪些topic
[]

创建一个topic

kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --create --topic ooxx --partitions 2 --replication-factor 2

查看所有的topic

[root@ke03 ~]# kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --list
ooxx

查看具体的topic详细信息

[root@ke03 ~]# kafka-topics.sh  --zookeeper ke02:2181,ke03:2181/kafka --describe --topic ooxx
Topic:ooxx	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: ooxx	Partition: 0	Leader: 3	Replicas: 3,2	Isr: 3,2
	Topic: ooxx	Partition: 1	Leader: 1	Replicas: 1,3	Isr: 1,3
说明: 
1.有两个分区,两个副本
2. 0号分区在brokers=3上
3. 1号分区在brokers=1上

  

生产者消费者演练

场景一: 启动一个消费者
1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
2.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
3.写入数据:a1,a2,a3
4. 一个消费者:a1,a2,a3 消费了全部数据
结论:生产者生产数据往p-0,p-1分区发数据, 一个consumer拿到了所有分区内的数据

场景二:启动二个消费者(在同一个组内)
1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group xiaoke
3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
4.写入数据:b1,b2,b3,b4
5.消费者1:b1,b3
6.消费者2:b2,b4
结论: 生产者生产数据往p-0,p-1分区发数据,两个consumer轮询平分了所有分区内的数据,既:1个分区对应一个consumer

场景三: 启动二个消费者,每个组内一个消费者
1.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g1
2.启动消费者: kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx --group g2
3.启动生产者:kafka-console-producer.sh --broker-list ke02:9092 --topic ooxx
4.写入数据:c1,c2,c3,c4
5.消费者1:c1,c2,c3,c4
6.消费者2:c1,c2,c3,c4
结论:数据的重复利用是站在Group上的

1.kafka-console-consumer.sh --bootstrap-server ke01:9092,ke02:9092 --topic ooxx 
当消费者连接没有给出组时,kafka默认分配组,每次连接都会分配一个新的
2.查看所有组
[root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke02:9092 --list
xiaoke
console-consumer-69705
console-consumer-67386
3.查看组详细
[root@ke03 ~]# kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group xiaoke
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST             CLIENT-ID
ooxx            0          6               6               0               consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee /192.168.244.183 consumer-1
ooxx            1          5               5               0               consumer-1-5e729f67-d0f7-47b2-a7c4-42b4024634ee /192.168.244.183 consumer-1

数据怎么保证有序性(看kafka图:http://git.mashibing.com/zhouzhilei/kafka20/-/blob/master/image/kafka%E5%89%8D%E7%9E%BB--%E6%9E%B6%E6%9E%84%E7%BB%B4%E5%BA%A6--0128.jpg)

基础:
1.三个生产者 A,B,C,2个分区
2.消息是K,V的,同类的k消息是在同一个分区内
3.多种消息K是存在同一分区内

保证有序
0.需要给消息设置key
1.生产者要先保证有序,既A生产a,B生产b,不能A生产b,这样的话不能保证数据b有序的到达分区内 2.同一个key在同一分区内是有序的,存在不同key之间交叉。既把需要排序的数据设置同一个key 3.consumer拉取数据

consumer拉取和推送对比

0.kafka使用的是拉取
1.推送:可能server消费能力跟不上kafka的推送能力,所以kafka需要维护一个状态,用来保存server发来自己能否还能消费的状态
2.拉取:自主,按需,去订阅去拉取server的数据,拉取是批量拉取

offset怎么维护

1.单线程,一条一条更新offset
2.多线程,一次处理拉过来的一批数据(5) + 事务,如果成功就+5,如果失败就回滚

代码:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
package com.xiaoke;


import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;


import java.time.Duration;
import java.util.*;
import java.util.concurrent.Future;

public class KafkaDemo {


    @Test
    public void produce() throws Exception {
        String topic = "604-items";
        Properties p = new Properties();
        p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092");
        //kafka  持久化数据的MQ  数据-> byte[],不会对数据进行干预,双方要约定编解码
        //kafka是一个app::使用零拷贝  sendfile 系统调用实现快速数据消费
        p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);

        //现在的producer就是一个提供者,面向的其实是broker,虽然在使用的时候我们期望把数据打入topic

        /*
        topic:xjt_items
        2partition
        三种商品,每种商品有线性的3个ID

        验证:1.保证顺序 2.相同的商品最好去到一个分区里
         */

        while (true) {
            for (int i = 0; i < 3; i++) {
                for (int j = 0; j < 3; j++) {
                    ProducerRecord<String, String> record = new ProducerRecord(topic, "item" + j, "val" + i);
                    Future<RecordMetadata> send = producer
                            .send(record);

                    RecordMetadata rm = send.get();
                    int partition = rm.partition();
                    long offset = rm.offset();
                    System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);

                }
            }
        }
        /**
         * 1.县创建topic: kafka-topics.sh --zookeeper ke03:2181/kafka  --create --topic 604-items  --partitions 2 --replication-factor 2
         * 2.生产数据
         *
         * 结果:
         key: item0 val: val0 partition: 1 offset: 0
         key: item1 val: val0 partition: 0 offset: 0
         key: item2 val: val0 partition: 1 offset: 1
         key: item0 val: val1 partition: 1 offset: 2
         key: item1 val: val1 partition: 0 offset: 1
         key: item2 val: val1 partition: 1 offset: 3
         key: item0 val: val2 partition: 1 offset: 4
         key: item1 val: val2 partition: 0 offset: 2
         key: item2 val: val2 partition: 1 offset: 5
         key: item0 val: val0 partition: 1 offset: 6
         *
         * 整理:
         * key: item1 val: val0 partition: 0 offset: 0
         * key: item1 val: val1 partition: 0 offset: 1
         * key: item1 val: val2 partition: 0 offset: 2
         *
         *
         * key: item0 val: val0 partition: 1 offset: 0
         * key: item2 val: val0 partition: 1 offset: 1
         * key: item0 val: val1 partition: 1 offset: 2
         * key: item2 val: val1 partition: 1 offset: 3
         * key: item0 val: val2 partition: 1 offset: 4
         * key: item2 val: val2 partition: 1 offset: 5
         * key: item0 val: val0 partition: 1 offset: 6
         * 结论:
         * 1.相同的产品在同一个分区内
         * 2.相同的产品在同一个分区内是有序的
         * 3.offset是分区粒度的
         *
         */
    }


    @Test
    public void consumer() {
        String topic = "604-items";
        Properties p = new Properties();
        p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ke01:9092,ke02:9092,ke03:9092");
        p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //设置消费组
        p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group04");

        /**
         *         "What to do when there is no initial offset in Kafka or if the current offset
         *         does not exist any more on the server  第一次启动设置offset可以用一下参数
         *         (e.g. because that data has been deleted):
         *         <ul>
         *             <li>earliest: automatically reset the offset to the earliest offset 取当前的
         *             <li>latest: automatically reset the offset to the latest offset</li> 取之前的
         *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li> 没有offset就报错
         *             <li>anything else: throw exception to the consumer.</li>
         *         </ul>";
         */
        p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//

        /**
         * 0.true 自动提交offset false手动提交offset
         * 1.自动提交时异步提交,丢数据&&重复数据
         * 2.一旦你自动提交,但是是异步的
         *         //1,还没到时间,挂了,没提交,重起一个consuemr,参照offset的时候,会重复消费
         *         //2,一个批次的数据还没写数据库成功,但是这个批次的offset背异步提交了,挂了,重起一个consuemr,参照offset的时候,会丢失数据
         */
        p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "15000");//5秒
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据,弹性,按需,拉取多少?

        KafkaConsumer<String, String> consumer = new KafkaConsumer(p);
        //kafka 的consumer会动态负载均衡
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("---onPartitionsRevoked:");
                Iterator<TopicPartition> iter = partitions.iterator();
                while (iter.hasNext()) {
                    System.out.println(iter.next().partition());
                }

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("---onPartitionsAssigned:");
                Iterator<TopicPartition> iter = partitions.iterator();

                while (iter.hasNext()) {
                    System.out.println(iter.next().partition());
                }

            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0L));// 0~n
            if (!records.isEmpty()) {
                System.out.println("-----------" + records.count() + "-------------");
                Set<TopicPartition> partitions = records.partitions(); //每次poll的时候是取多个分区的数据
                for (TopicPartition partition : partitions) {
                    List<ConsumerRecord<String, String>> pRecords = records.records(partition);
                    Iterator<ConsumerRecord<String, String>> piter = pRecords.iterator();
                    while (piter.hasNext()) {
                        ConsumerRecord<String, String> next = piter.next();
                        int par = next.partition();
                        long offset = next.offset();
                        String key = next.key();
                        String value = next.value();
                        long timestamp = next.timestamp();
                        System.out.println("key: " + key + " val: " + value + " partition: " + par + " offset: " + offset + "time:: " + timestamp);
                        //单线程,多线程,都可以

                        /**
                         * 方式一:单条记录手动提交offset
                         */
                        TopicPartition sp = new TopicPartition("msb-items", par);
                        OffsetAndMetadata om = new OffsetAndMetadata(offset);
                        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                        map.put(sp, om);
                        consumer.commitSync(map);

                    }
                    /**
                     * 方式二:,分区粒度手动提交offset
                     */
                    long poff = pRecords.get(pRecords.size() - 1).offset();//获取分区内最后一条消息的offset
                    OffsetAndMetadata pom = new OffsetAndMetadata(poff);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(partition, pom);
                    consumer.commitSync(map);

                }
                /**
                 * 方式三:按poll的批次提交offset,第3点
                 */
                consumer.commitSync();
            }
        }


    }


    /**
     * 验证earliest和latest区别:
     * 1. a.设置自动条件为15秒 b.group01 earliest启动 c.group02 latest启动
     *
     *  kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group01 15秒内结果:
     *  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
     * 604-items       0          -               26              -               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
     * 604-items       1          -               53              -               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
     *
     * 15秒后结果:
     * TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
     * 604-items       0          26              26              0               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
     * 604-items       1          53              53              0               consumer-group03-1-857a5f24-fce6-4240-b73e-b9b5dc079553 /192.168.244.1  consumer-group01-1
     *
     *
     *kafka-consumer-groups.sh --bootstrap-server ke03:9092 --describe --group group02 15秒内结果: 并没有消费之前的数据
     * TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
     * 604-items       0          26              26              0               consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 /192.168.244.1  consumer-group02-1
     * 604-items       1          53              53              0               consumer-group04-1-08c34055-ba99-4a66-a6e2-a7a155ab2e63 /192.168.244.1  consumer-group02-1
     *
     * 结论:earliest第一次启动会从0开始消费数据  latest第一次启动会从当前consumer启动后消费数据,不会消费启动之前的数据
     */


}
原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/15059209.html