rabbitmq Exchange四种模式

一、什么是Exchange

RabbitMQ 是 AMQP(高级消息队列协议)的标准实现:

从 AMQP 协议可以看出,Queue、Exchange 和 Binding 构成了 AMQP 协议的核心

  • Producer:消息生产者,即投递消息的程序。

  • Broker:消息队列服务器实体。

    • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

    • Binding:绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。

    • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

  • Consumer:消息消费者,即接受消息的程序。

二、Exchange的类型

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种

fanout 

fanout类型的Exchange路由规则非常简单,它会把所有发送到fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

direct

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

direct Exchange是RabbitMQ Broker的默认Exchange,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。

 
 

direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

topic

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

a)、routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
b)、binding key与routing key一样也是句点号“. ”分隔的字符串
c)、binding key中可以存在两种特殊字符"*"与“#”,用于做模糊匹配,其中" * "用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号"*"匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

 

示例代码
生产者
const amqp = require('amqplib');

async function producer() {
  try {
    // 1. 创建链接对象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 获取通道
    const channel = await connection.createChannel();

    // 3. 声明参数
    const exchangeName = 'qosEx';
    const routingKey = 'qos.test001.cool';
    const msg = 'Producer';

    // 4. 声明交换机
    await channel.assertExchange(exchangeName, 'topic', { durable: true });

    for (let i = 0; i < 5; i++) {
      // 5. 发送消息
      console.log(i);
      await channel.publish(
        exchangeName,
        routingKey,
        Buffer.from(`${msg} 第${i}条消息`)
      );
    }

    await channel.close();
  } catch (error) {
    console.log(error);
  }
}

producer();

消费者

const amqp = require('amqplib');

async function consumer() {
  // 1. 创建链接对象
  const connection = await amqp.connect('amqp://122.51.9.11:5672');

  // 2. 获取通道
  const channel = await connection.createChannel();

  // 3. 声明参数
  const exchangeName = 'qosEx';
  const queueName = 'qosQueue';
  const bindingKey = 'qos.#';
  // const bindingKey = 'qos.*';  无法匹配

  // 4. 声明交换机、对列进行绑定
  await channel.assertExchange(exchangeName, 'topic', { durable: true });
  await channel.assertQueue(queueName);
  await channel.bindQueue(queueName, exchangeName, bindingKey);

  // 5. 限流参数设置
  await channel.prefetch(1, false);// count:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费。global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false。
  // 6. 限流,noAck参数必须设置为false
  await channel.consume(
    queueName,
    (msg) => {
      console.log('Consumer:', msg.content.toString());
      channel.ack(msg);
    },
    { noAck: false }
  );
}

consumer();
参考链接:https://www.jianshu.com/p/19af0f40bbde



原文地址:https://www.cnblogs.com/xiaosongJiang/p/13038062.html