RabbitMQ 简介

前言

  1. RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
  2. 本身支持很多的协议:AMQP, XMPP, SMTP, STONP
  3. RabbitMQ服务器是用Erlang语言编写的,实现了代理(Broker)架构,意味着消息在发送到客户端之前可以在中央节点上排队。
    此特性使得RabbitMQ易于使用和部署,适宜于很多场景如路由、负载均衡或消息持久化等

主要特性

  1. 消息集群(Clustering):
    RabbitMQ支持分布式部署,多个RabbitMQ服务器组成一个集群形成一个逻辑Broker
  2. 可靠性(Reliability):
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  3. 高可用(Highly Available Queues):
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

基本组成

Producer: 消息发送者

Message :消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成

RabbitMQ:
    - Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。默认"/"
   
    - Exchange: 路由器,接收消息,根据RoutingKey分发消息,有四种类型:
        - headers:消息头类型 路由器,内部应用
        - direct:精准匹配类型 路由器
        - topic:主题匹配类型 路由器,支持正则 模糊匹配
        - fanout:广播类型 路由器,RoutingKey无效

    - RoutingKey: 路由规则

    - Queue: 队列,用于存储消息,保存消息直到发送给消费者。它是消息的容器,也是消息的终点(消息的目的地)

    - Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 

    - Connection:网络连接,比如一个TCP连接。 

    - Channel:信道:建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 

Consumer: 消息消费者

RabbitMQ 消息确认策略

Confrim: 
    简单说就是直接传送消息 client > mq, 接收到 mq > ack, mq 在异步的完成接下来的事情
    发送线程不会立即得到MQ反馈结果,发送后通过callback确认成功失败

Transaction: 
    client 请求开启事务  > 发送message > client 提交事务,整个过程是同步的,mq必须完成消息持久化、消息同步等
    发送线程会立即得到MQ反馈结果,同一线程中,多个发送阻塞进行

消息消费的两种模式

1. 无需反馈: 消费者从消息队列获取消息后,服务端就认为该消息已经成功消费

2. 消息消费完给服务器返回确认状态,表示该消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

持久化

RabbitMQ中消息的持久化需要保证Exchange、Queue、Message都进行持久化操作。

Exchange的持久化
    @param exchange the name of the exchange
    @param type the exchange type
    @param durable true if we are declaring a durable exchange 
    exchangeDeclare(String exchange, String type, boolean durable)


Queue的持久化
    @param queue the name of the queue
    @param durable true if we are declaring a durable queue (the queue will survive a server restart)
    @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
    @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
    @param arguments other properties (construction arguments) for the queue
    queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)

    autoDelete:表明该队列是否自动删除。
    自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。


Message的持久化
    @param exchange the exchange to publish the message to
    @param routingKey the routing key
    @param props other properties for the message - routing headers etc
    @param body the message body
    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

    exchange:表示该消息发送到哪个Exchange
    routingKey:表示该消息的Routing Key
    props:表示该消息的属性    
        BasicProperties.PERSISTENT_TEXT_PLAIN
        BasicProperties.PERSISTENT_BASIC
    body:消息实体

存活时间 TTL

Queue TTL(Queue上没有Consumer)

Map<String, Object> queueArgs = new HashMap<>();
// 设置1分钟过期
queueArgs.put("x-expires", 60000);
channel.queueDeclare("queue", false, false, false, queueArgs);

Message TTL(消息在队列中的存活时间,超过该时间消息将被删除或者不能传递给消费者)

// 设置消息属性-TTL为30s
BasicProperties properties = new BasicProperties.Builder().expiration("30000").build();
channel.basicPublish("exchange", "kanyuxia", properties,"hello".getBytes(StandardCharsets.UTF_8));
原文地址:https://www.cnblogs.com/loveer/p/11421987.html