分布式消息通信之RabbitMQ_02

1. 可靠性投递分析

RabbitMQ工作模型
 在某些业务实时一致性要求较高的场景,需要确保消息投递的可靠性,可以从RabbitMQ的工作模型的4个主要流程来做处理;并且效率和可靠性不可能同时兼顾,如果要保证每个环节都成功,对消息的收发效率肯定会有影响。
  1. 生产者将消息投递至交换机
  2. 交换机根据消息的路由关键字和队列的绑定关键字匹配,将消息路由到匹配的队列
  3. 队列将消息存储在内存或者磁盘当中
  4. 消费者从队列取走消息
  5. 其他

1.1 消息投递

 有两种方法可以监听生成着投递消息是否成功事物transaction模式确认confirm模式;
 事物模式(不推荐使用):可以使用com.rabbitmq.client.Channel#txSelect来开启事物,当消息由生产者投递到RabbitMQ服务器成功后,事物会提交;不过事物模式会严重影响RabbitMQ特性,一般不建议使用。

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        try {

            // 开启事物
            channel.txSelect();
            System.out.println("send msg..");
            channel.basicPublish("", SimpleConsumer.QUEUE_NAME, null,
                    "Hello World".getBytes());
            channel.txCommit();
            System.out.println("消息发送成功!");
        } catch (Exception e) {
            // 回滚事务
            System.out.println("消息发送失败, rollback");
            channel.txRollback();
        }

 确认Confirm模式又分为批量确认,异步确认

Normal: 
        channel.confirmSelect();

        channel.basicPublish("", SimpleConsumer.QUEUE_NAME, null,
                "hello world confirm msg!".getBytes());

        if (channel.waitForConfirms()) {

            System.out.println("消息投递成功!");

        }

batch: 
       try {
            channel.confirmSelect();
            for (int i = 0; i < 5; i++) {
                // 发送消息
                // String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            }
            // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
            // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
            // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
            channel.waitForConfirmsOrDie();
            System.out.println("消息发送完毕,批量确认成功");
        } catch (Exception e) {
            // 发生异常,可能需要对所有消息进行重发
            e.printStackTrace();
        }

 Async

       ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, Async Confirm";
        // 声明队列(默认交换机AMQP default,Direct)
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(SimpleConsumer.QUEUE_NAME, false, false, true, null);

        // 用来维护未确认消息的deliveryTag
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        // 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
        // 异步监听确认和未确认的消息
        // 如果要重复运行,先停掉之前的生产者,清空队列
        channel.addConfirmListener(new ConfirmListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Broker未确认消息,标识:" + deliveryTag);
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    confirmSet.remove(deliveryTag);
                }
                // 这里添加重发的方法
            }
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
                System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                System.out.println("multiple:"+multiple);
                if (multiple) {
                    System.out.println("deliveryTag:"+deliveryTag);
                    // headSet表示后面参数之前的所有元素,全部删除
                    confirmSet.headSet(deliveryTag + 1L).clear();
                } else {
                    // 只移除一个元素
                    confirmSet.remove(deliveryTag);
                }
                System.out.println("未确认的消息:"+confirmSet);
            }
        });

        // 开启发送方确认模式
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            // 发送消息
            // String exchange, String routingKey, BasicProperties props, byte[] body
            channel.basicPublish("", SimpleConsumer.QUEUE_NAME, null, (msg +"-"+ i).getBytes());
            confirmSet.add(nextSeqNo);
        }
        System.out.println("所有消息:"+confirmSet);

        // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK
        //channel.close();
        //conn.close();

1.2 消息路由

 当消息投递到Exchange后,会根据消息的路由关键字来匹配路由到的队列,当关键字没有匹配到队列或者队列不存在或者路由关键字错误时,消息就会丢失;为了保证消息可以被正确的路由,可以使用两种方式:生产者添加ReturnListener或者创建队列时指定备份交换机 alternate-exchange


public class SimpleConsumer {

    public static final String EXCHANGE_NAME = "Simple_Reliability_Exchange";
    public static final String QUEUE_NAME = "Simple_Reliability_Queue";

}
- - - - - - - - 

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Rmq03ReturnListener {

    private static final String BACKUP_EXCHANGE = "MY_alternate_exchange";
    private static final String BACKUP_QUEUE = "MY_alternate_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
//        factory.setPort(15672);
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 设置交换机无法路由的被返回的消息的监听器
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("ReturnListener 接收到无法路由被退回的消息 " + new Date());
                System.out.println(String.format("replyCode[%s] replyText[%s] exchange[%s] routingKey[%s] properties[%s] msg[%s]",
                        replyCode, replyText, exchange, routingKey, properties, new String(body)));

            }
        });

        // 初始化核心交换机和队列
        Map<String, Object> exchangeArgs = new HashMap();
        exchangeArgs.put("alternate-exchange", BACKUP_EXCHANGE);    // 指定交换机的备份交换机,接收无法路由的消息
        channel.exchangeDeclare(SimpleConsumer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false, true, exchangeArgs);
        channel.queueDeclare(SimpleConsumer.QUEUE_NAME, false, false, true, null);
        channel.queueBind(SimpleConsumer.QUEUE_NAME, SimpleConsumer.EXCHANGE_NAME, "#.fast.#");

        // 初始化备份交换机和队列
        initBackup(channel);

        // 发送消息
        String msg = "Hello World, test  msg can not rout ";
        // 在发送消息是,可以设置mandatory参数未true,这样当消息在交换器上无法被路由时,服务器将消息返回给生产者,生产者实现回调函数处理被服务端返回的消息。
        Boolean mandatory = true;
        channel.basicPublish(SimpleConsumer.EXCHANGE_NAME, "v.fast", mandatory,null, msg.getBytes());
        channel.basicPublish(SimpleConsumer.EXCHANGE_NAME, "v.slow", mandatory,null, msg.getBytes());

//        channel.close();
//        connection.close();


    }

    private static void initBackup(Channel channel) throws IOException {

        channel.queueDeclare(BACKUP_QUEUE, false, false, true, null);
        channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT);
        channel.queueBind(BACKUP_QUEUE, BACKUP_EXCHANGE, "");

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {

                System.out.println("备份交换机 alternate-exchange 接收到无法路由的消息 " + new String(delivery.getBody()) );
                // 注释回执
                // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);

            }
        };
        channel.basicConsume(BACKUP_QUEUE, false, deliverCallback, consumerTag -> {});

    }

}

1.3 消息存储

 当消息路由至匹配队列,也就是步骤3时,如果消息没有被消费而RabbitMQ服务系统宕机或者重启了,会导致消息丢失,因此可以使用消息的持久化配置。
交换机持久化 消费者在创建交换机时指定存储在磁盘中,系统重启后交换机不被删除

  Consumer
  
        // 交换机持久化
        // String exchange, String type,
        // boolean durable (交换机持久化),
        // boolean autoDelete(自动删除,为true时当前连接中断且交换机绑定队列没有消息时,会自动删除),
        // Map<String, Object> arguments
        channel.exchangeDeclare(EXCHANGE_PERSISTENCE, BuiltinExchangeType.TOPIC, true, false, null);  

队列持久化 费者在创建队列时指定存储在磁盘中,系统重启后队列不被删除

   consumer

   // 队列持久化
        // String queue,
        // boolean durable,  队列持久化参数,
        // boolean exclusive,  排他队列参数
        // boolean autoDelete,  是否自动删除,为true时当前连接中断且交换机绑定队列没有消息时,会自动删除
        //        Map<String, Object> arguments
        channel.queueDeclare(QUEUE_PERSISTENCE, true, false, false, null);

消息持久化 生成者在发送消息时指定消息类型为持久化

  producer

       AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .build();
        channel.basicPublish(EXCHANGE_PERSISTENCE, "bird.fly.fast", properties, new String("just text persistence msg").getBytes());

集群 镜像

1.4 消息消费

 消费者在从队列中取走消息进行消费时,如果逻辑处理中出现异常或者服务宕机消息就会丢失,可以在消息时使用手动发送回执的方式进行操作,当消息真正处理完毕后发送回执信息。
producer 发送消息

   for (int i = 0; i < 10; i++) {
            String msg = i%2 == 0 ? "This is a success msg" : " This is a error msg";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }

consumer 消费消息

       // 消息消费 autoAck 为false

        //  DeliverCallback 内进行手工应答

        /*
         * 1. 成功
         * com.rabbitmq.client.Channel#basicAck(long, boolean)
         *
         * 2. 拒绝,
         *  拒绝可以分为单条拒绝 和 批量拒绝
         *  拒绝的消息可以设置重新入队,不重新入队的则进入死信交换机,死信队列
         * com.rabbitmq.client.Channel#basicReject(long, boolean)
         * com.rabbitmq.client.Channel#basicNack(long, boolean, boolean)
         *
         */
 DeliverCallback callback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {

                String msg = new String(delivery.getBody());
                System.out.println(String.format("消费者接收到消息 msg[%s] at %s", msg, new Date()));

                if (msg.contains("error")) {
                    // reject one
                    channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
                    // reject one or more
//                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), true, true);
                } else {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                }


            }
        };
        channel.basicConsume(QUEUE_NAME, false, callback, consumerTag -> {});

1.5 其他

消费者回调补偿机制消息幂等性消息顺序性
消费者回调
 当跨系统异步通信时,消费者从队列中取走消费并消费成功后,生产者其实并不知道自己发出的消息已经被处理掉了,所以 a) 生产者可以在发送消息时额外添加一个回调API接口,当消费者消费完消息时,调用消息的回调API通知生产者;或者b)消费者在消费完消息后,再发送一条消费成功的消息给生产者; 这样生产者就可以获知了。
补偿机制
 当生产者发送的消息一定时间内没有响应时,可以设置一个定时重发的机制,不过必须要规定重发次数如5,避免消息堆积。
消息幂等性
 消息的补偿重发机制会发送多次一样的消息给消费者,造成重复消费,可以为消息设置唯一id,将消费者处理完的消息记录下来,入库,每次接收到消息根据库中id判断消息是否已经处理过,如果处理过了就忽略这条消息。
消息顺序性
 当一个队列被多个消费者消费时,无法保证消费的顺序性。一个队列只有一个消费者时,顺序性可以得到保障。

2. 高可用架构部署方案

 避免单点故障造成数据丢失,可以启用集群来达到高可用和负载均衡。

2.1 集群

 集群部署与运维...
通信基础
 erlang.cookie,hosts
集群节点
  分为磁盘节点和内存节点两种,磁盘节点数据会保存在磁盘中,较安全;内存节点数据保存于内存中,访问效率较快;集群中最少有一个是磁盘节点,用于保存数据。
配置步骤
  a) 配置hosts b) 配置erlang.cookie c) 加入集群

2.2 镜像

3. 经验总结

3.1 配置文件与命名规范

 a)集中放在配置文件中 b) 体现数据类型 c) 体现数据来源和去向
lei.topicexchange=RISK_TO_PAY_EXCHANGE

3.2 调用封装

 在原有基础上封装,减少调用复杂度

  @Autowired
    @Qualifier("amqpTemplate2")
    private AmqpTemplate amqpTemplate2;

    /**
     * 演示三种交换机的使用
     *
     * @param message
     */
    public void sendMessage(Object message) {
        logger.info("Send message:" + message);

        // amqpTemplate 默认交换机 MY_DIRECT_EXCHANGE
        // amqpTemplate2 默认交换机 MY_TOPIC_EXCHANGE

        // Exchange 为 direct 模式,直接指定routingKey
        amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
        amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);
  }

3.3 信息落库(可追溯,可重发) + 定时任务

 根据业务场景可以将消息存入库中,可以进行追溯,表中记录消息回执状态,定时轮训库表判断是否收到回执,没有收到可以重发。
 同时连接数据库表也会造成发送消息时效率的降低;

3.4 减少连接数

 多条消息拼装到一起发送一次,总数据量不要超过4M

3.5 生产者先发送消息还是先登记业务表

 先登记业务表;如果先发送消息,后续业务操作异常导致回滚,信息就会不一致。

3.6 谁来创建对象(队列,交换机,绑定关系)

 由消费者创建 队列 交换机 绑定关系

3.7 运维监控

zabbix系列zabbix3.4监控rabbitmq

3.8 其他插件

Plugins

C:Program FilesRabbitMQ Server
abbitmq_server-3.7.14sbin>rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@DESKTOP-2NHH5NJ
 |/
[  ] rabbitmq_amqp1_0                  3.7.14
[  ] rabbitmq_auth_backend_cache       3.7.14
[  ] rabbitmq_auth_backend_http        3.7.14
[  ] rabbitmq_auth_backend_ldap        3.7.14
[  ] rabbitmq_auth_mechanism_ssl       3.7.14
[  ] rabbitmq_consistent_hash_exchange 3.7.14
[  ] rabbitmq_event_exchange           3.7.14
[  ] rabbitmq_federation               3.7.14
[  ] rabbitmq_federation_management    3.7.14
[  ] rabbitmq_jms_topic_exchange       3.7.14
[E*] rabbitmq_management               3.7.14
[e*] rabbitmq_management_agent         3.7.14
[  ] rabbitmq_mqtt                     3.7.14
[  ] rabbitmq_peer_discovery_aws       3.7.14
[  ] rabbitmq_peer_discovery_common    3.7.14
[  ] rabbitmq_peer_discovery_consul    3.7.14
[  ] rabbitmq_peer_discovery_etcd      3.7.14
[  ] rabbitmq_peer_discovery_k8s       3.7.14
[  ] rabbitmq_random_exchange          3.7.14
[  ] rabbitmq_recent_history_exchange  3.7.14
[  ] rabbitmq_sharding                 3.7.14
[  ] rabbitmq_shovel                   3.7.14
[  ] rabbitmq_shovel_management        3.7.14
[  ] rabbitmq_stomp                    3.7.14
[  ] rabbitmq_top                      3.7.14
[  ] rabbitmq_tracing                  3.7.14
[  ] rabbitmq_trust_store              3.7.14
[e*] rabbitmq_web_dispatch             3.7.14
[  ] rabbitmq_web_mqtt                 3.7.14
[  ] rabbitmq_web_mqtt_examples        3.7.14
[  ] rabbitmq_web_stomp                3.7.14
[  ] rabbitmq_web_stomp_examples       3.7.14
原文地址:https://www.cnblogs.com/Qkxh320/p/distributed_rabbitmq_02.html