rabbitMQ笔记

六种工作模式

官网介绍:https://www.rabbitmq.com/getstarted.html

简单模式:一个生产者,一个消费者

work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。

订阅模式:一个生产者发送的消息会被多个消费者获取。

路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key

topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

简单模式/work模式

            Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
            
            channel.queueDeclare("yangsimple", false, false, false, null);
            channel.basicPublish("", "yangsimple", null, "simple message".getBytes());
            System.out.println("basic publish");
            channel.close();

订阅模式/路由模式

//生产者

Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

//此处不需要单独queueDeclare操作,直接向Exchange发送消息

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);

String message = "message";

channel.basicPublish(EXCHANGE_NAME,"key2", null, message.getBytes());
System.out.println("[x] Sent '"+message+"'");

channel.close();
connection.close();



//////###########

//消费者
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false,false,false,null);

//此处需要定义queue以确定向订阅哪一个queue

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);

while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 "+message);
Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

 AMQP:

注:

这里的Connection是个长连接,消息的传递都是在Channel中执行的。Channel是个虚拟链接,使用Channel大大的减少创建TCP(Connection)的资源消耗。

消息消费两种模式:

一种是 Push 模式,只要生产者发到服务器,就马上推 送给消费者。另一种是 Pull 模式,消息存放在服务端,只有消费者主动获取才能拿到消息。 

Exchange解释:

交换机是一个绑定列表,用来查找匹配的绑定关系。

在 RabbitMQ 里面永远不会出现消息直接发送到队列的情况,默认交换机是”“。因为在 AMQP 里面 引入了交换机(Exchange)的概念,用来实现消息的灵活路由。

队列使用绑定键(Binding Key)跟交换机建立绑定关系。 生产者发送的消息需要携带路由键(Routing Key),交换机收到消息时会根据它保存的绑定列表,决定将消息路由到哪些与它绑定的队列上。 
注意:交换机与队列、队列与消费者都是多对多的关系。

Queue扩展参数:

含义

x-message-ttl

队列中消息的存活时间,单位毫秒

x-expires

队列在多久没有消费者访问以后会被删除

x-max-length

队列的最大消息数

x-max-length-bytes

队列的最大容量,单位 Byte

x-dead-letter-exchange

队列的死信交换机

x-dead-letter-routing-key

死信交换机的路由键

x-max-priority

队列中消息的最大优先级,消息的优先级不能超过它

虚拟主机 VHOST

VHOST 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。
它的作用类似于编程语言中的 namespace 和 package,不同的 VHOST 中可以有 同名的 Exchange 和 Queue,它们是完全透明的。

 死信队列

条件:

1. 消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue == false
2. 消息过期
3. 队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes (字节数),最先入队的消息会被发送到 DLX。

声明死信队列代码:

//首先声明 死信队列Exchange及Queue

@Bean("deatLetterExchange")
 public TopicExchange deadLetterExchange() {
return new TopicExchange("DEAD_LETTER_EXCHANGE", true, false, new HashMap<>());
} @Bean(
"deatLetterQueue") public Queue deadLetterQueue() { return new Queue("DEAD_LETTER_QUEUE", true, false, false, new HashMap<>());
}
//此处Exchange和queue进行绑定 @Bean public Binding bindingDead(@Qualifier("deatLetterQueue") Queue queue,@Qualifier("deatLetterExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("#"); // 无条件路由
}
///声明正常队列时,将死信队列信息添加到arguments中 @Bean("oriUseExchange") public DirectExchange exchange() { return new DirectExchange("_USE_EXCHANGE", true, false, new HashMap<>());
} @Bean(
"oriUseQueue") public Queue queue() { Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 10000); // 10 秒钟后成为死信 map.put("x-dead-letter-exchange", "DEAD_LETTER_EXCHANGE"); // 队列中的消息变成死信后,进入死信交换机 return new Queue("_USE_QUEUE", true, false, false, map);
} @Bean
public Binding binding(@Qualifier("oriUseQueue") Queue queue,@Qualifier("oriUseExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("ori.use");
}

 消费端限流:

在消费者处理消息的能力有限,例如消费者数量太少,或者单条消息的处理时间过 长的情况下,如果我们希望在一定数量的消息消费完之前,不再推送消息过来,就要用 到消费端的流量限制措施。

可以基于 Consumer 或者 channel 设置 prefetch count 的值,含义为 Consumer端的最大的 unacked messages 数目。当超过这个数值的消息未被确认,RabbitMQ 会 停止投递新的消息给该消费者。

// 如果超过 2 条消息没有发送 ACK,当前消费者不再接受队列消息
channel.basicQos(2); 
channel.basicConsume(QUEUE_NAME, false, consumer);

///SimpleMessageListenerContainer
container.setPrefetchCount(2);

 


多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

Channel: 信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

为什么不通过TCP直接发送命令?

对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

RabbitMQ主要组件:

ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。

RoutingKey(路由键):用于把生成者的数据分配到交换器上;

BindingKey(绑定键):用于把交换器的消息绑定到队列上

消息持久化的缺点: 性能大幅度降低:

消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

所以使用者要根据自己的情况,选择适合自己的方式。

 虚拟主机: vhost

每个Rabbit都能创建很多vhost,我们称之为虚拟主机,每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制

1. RabbitMQ默认的vhost是“/”开箱即用;

2. 多个vhost是隔离的,多个vhost无法通讯,并且不用担心命名冲突(队列和交换器和绑定),实现了多层分离;

3. 创建用户的时候必须指定vhost;

fanout交换器——发布/订阅模式

fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。

对于fanout交换器来说routingKey(路由键)是无效的,这个参数是被忽略的。

final String ExchangeName = "fanoutec"; // 交换器名称
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器

具体参照: https://www.cnblogs.com/vipstone/p/9295625.html

topic交换器——匹配订阅模式

topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”作为分隔符,消费消息的时候routingKey可以使用下面字符匹配消息:

  • "*"匹配一个分段(用“.”分割)的内容;
  • "#"匹配0和多个字符;

生产者可靠性发布的实现方式:通过AMQP提供的事务机制实现, 使用发送者确认模式实现


消息可靠性发送

消息的发送流程主要如下

 针对第一点生产者发送消息到 Broker,在 RabbitMQ 里面提供了两种机制服务端确认机制

第一种是 Transaction(事务)模式,第二种 Confirm(确认)模式。

事务使用

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  1. channel.txSelect()声明启动事务模式;

  2. channel.txComment()提交事务;

  3. channel.txRollback()回滚事务;

Demo:

    channel.txSelect(); // 声明事务
    // 发送消息
    channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    channel.txCommit(); // 提交事务

事务的缺点是:只有收到了服务端的 Commit-OK 的指令,才能提交成功效率太差.

Confirm发送方确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。

Confirm的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式(发送一条确认一条);

方式二:channel.waitForConfirmsOrDie()批量确认模式(难点:无法确认每次发送多少条合适);

方式三:channel.addConfirmListener()异步监听发送方确认模式;

Demo:

// 开启发送方确认模式
channel.confirmSelect();
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
    System.out.println("消息发送成功" );
}


///////////////
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = String.format("时间 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");

//////////
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
    String message = String.format("时间 => %s", new Date().getTime());
    channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + deliveryTag);
    }
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
    }
});

生产者可靠性发布小结:

Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。 


RabbitMQ持久化方案,包含Exchange,Queue,Message三种设置

@Bean("Queue") public Queue GpQueue() {
// queueName, durable, exclusive, autoDelete, Properties
return new Queue("Test_queuename", true, false, false, new HashMap<>()); }

@Bean("Exchange")
public DirectExchange exchange() {
// exchangeName, durable, exclusive, autoDelete, Properties
return new DirectExchange("Name_TEST_EXCHANGE", true, false, new HashMap<>()); }

//Message 
MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
Message message = new Message("持久化消息".getBytes(), messageProperties); rabbitTemplate.send("GP_TEST_EXCHANGE", "test", message);

消费者消费数据高可用性

1. 调用生产者API进行回调

2.发送响应消息给生产者


MQ集群设置

因为 Erlang 天生具备分布式的特性, 所以 RabbitMQ 天然支持集群,不需要通过引入 ZK 或者数据库来实现数据同步

MQ集群分两种:

第一种普通集群,各个机器间只共享元数据,

第二种:镜像队列

镜像队列模式下,消息内容会在镜像节点间同步,可用性更高。不过也有一定的副 作用,系统性能会降低,节点过多的情况下同步的代价比较大。


原文地址:https://www.cnblogs.com/snow-man/p/11653926.html