RabbitMQ ——四种ExChange及完整示例

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种,下面分别进行介绍。
这四种类的exchange分别有以下一些属性,分别是:

name:名称
Durability:持久化标志,如果为true,则表明此exchange是持久化的。
Auto-delete:删除标志,当所有队列在完成使用此exchange时,是否删除

1、fanout类型

fanout类型的Exchange路由规则非常简单,它会把所有发送到fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

声明一个exchange

// 声明一个名称为"exchange_fanout"的exchange
channel.exchangeDeclare("exchange_fanout", "fanout");
// 将消息发送给exchange
channel.basicPublish("exchange_fanout", "", null, msg.getBytes());

声明临时队列(Temporary queues)

一个 non-durable(不持久化的), exclusive(单独的), autodelete(随着消费者的消亡自动删除的),random(随机命名)的一个队列

String queueName = channel.queueDeclare().getQueue();

绑定(Bindings) 

上面我们已经创建好了exchange,并且定义了一个临时队列,现在我要把消息从exchange推送到所有队列上,是不是该把exchange和队列绑定一下?不然exchange如何知道把消息发哪去呢

channel.queueBind("队列名", "exchange名", "");

完整代码

生产者Producer

package test;

import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

	private final static String EXCHANGE_NAME = "exchange_fanout";

	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");

		// 创建connection
		Connection conn = factory.newConnection();
		// 创建channel
		Channel channel = conn.createChannel();
		// 声明该channel是fanout类型
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		Date nowDate = new Date();
		String msg = nowDate.getTime() + " have log ...";
		// 将消息发送给exchange
		channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
		System.out.println(nowDate + " 已经生成一条日志...");
		channel.close();
		conn.close();
	}
}

消费者Consumer

package test;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Consumer {

	private final static String EXCHANGE_NAME = "exchange_fanout";

	public static void main(String[] args)
			throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 获取不同的pid,方便标识不同的消费者
		String name = ManagementFactory.getRuntimeMXBean().getName();
		String pid = name.split("@")[0];
		// 创建连接和channel
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");

		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 由RabbitMQ自行创建的临时队列,唯一且随消费者的中止而自动删除的队列
		String queueName = channel.queueDeclare().getQueue();
		// binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		System.out.println(pid + "已经创建,正在等待消息...");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定队列消费者
		channel.basicConsume(queueName, true, consumer);

		while (true) {
			// Delivery : 封装了消息,消息的载体
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String recieveMsg = new String(delivery.getBody());
			System.out.println(pid + "接收到了消息: " + recieveMsg);
		}
	}
}

2、direct类型

direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

direct Exchange是RabbitMQ Broker的默认Exchange,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名。

direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

bindings(绑定)和routingKey(路由键) 

关于bindings,官网有一段话: 
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange. 
翻译一下: binding 是exchange(交换器)和queue(队列)关系的一种描述,可以简单的被理解为:该队列对这个exchange上的消息感兴趣

在定义一段关系时,bindings可以带一个routingKey(String类型)的参数,如下:

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey_direct");

表示该队列只对routingKey为routingKey_direct的消息感兴趣,而此时的routingKey从何而来呢?别忘了,我们还有一个producer呢,它不正是用来发布消息的吗?

channel.basicPublish(EXCHANGE_NAME, "routingKey_direct", null, msg.getBytes());

上面这段代码的意思是我要发消息给exchange,并且它的routingKey为routingKey_direct。其实说白了,就是说: 发消息给exchange时可以指定routingKey,exchange和队列之间可以定义binding,接收者只处理binding和routingKey相同的消息。

完整代码

生产者Producer

package test;

import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

	private final static String EXCHANGE_NAME = "exchange_direct";

	private final static String ROUTING_KEY = "routingKey_direct";

	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");

		// 创建connection
		Connection conn = factory.newConnection();
		// 创建channel
		Channel channel = conn.createChannel();
		// 声明该channel是direct类型
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		Date nowDate = new Date();
		String msg = nowDate.getTime() + " have log ...";
		// 将消息发送给exchange
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
		System.out.println(nowDate + " 已经生成一条日志...");
		channel.close();
		conn.close();
	}
}

消费者Consumer

package test;
 
import java.io.IOException;
import java.lang.management.ManagementFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
 
public class Consumer {
 
	private final static String EXCHANGE_NAME = "exchange_direct";
 
	private final static String ROUTING_KEY = "routingKey_direct";
 
	public static void main(String[] args)
			throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 获取不同的pid,方便标识不同的消费者
		String name = ManagementFactory.getRuntimeMXBean().getName();
		String pid = name.split("@")[0];
		// 创建连接和channel
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");
 
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 由RabbitMQ自行创建的临时队列,唯一且随消费者的中止而自动删除的队列
		String queueName = channel.queueDeclare().getQueue();
		// 或声明创建持久队列
		// String queueName = ROUTING_KEY + ".queue";
		// channel.queueDeclare(queueName, false, false, true, null);
 
		// binding
		channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
		
		 // 将一个对列绑定多个路由键示例
        /*
        String[] routingKeys = {"debug", "info"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(queueName, EXCHANGE_NAME, routingKeys[i]);
        }
        */
 
		System.out.println(pid + "已经创建,正在等待消息...");
 
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定队列消费者
		channel.basicConsume(queueName, true, consumer);
 
		while (true) {
			// Delivery : 封装了消息,消息的载体
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String recieveMsg = new String(delivery.getBody());
			System.out.println(pid + "接收到了消息: " + recieveMsg);
		}
	}
}

3、topic类型

前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:

a)、routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
b)、binding key与routing key一样也是句点号“. ”分隔的字符串
c)、binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。

完整代码

生产者Producer

package test;

import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

	private final static String EXCHANGE_NAME = "exchange_topic";

	private final static String ROUTING_KEY = "log.info";

	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");

		// 创建connection
		Connection conn = factory.newConnection();
		// 创建channel
		Channel channel = conn.createChannel();
		
		// 声明该channel是topic类型
		channel.exchangeDeclare(EXCHANGE_NAME, "topic", false, true, null);
		
		Date nowDate = new Date();
		String msg = nowDate.getTime() + " have log ...";
		
		// 将消息发送给exchange
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
		
		System.out.println(nowDate + " 已经生成一条日志...");
		channel.close();
		conn.close();
	}
}

消费者Consumer

package test;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class Consumer {

	private final static String EXCHANGE_NAME = "exchange_topic";

	private final static String ROUTING_KEY = "log.#";

	public static void main(String[] args)
			throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 获取不同的pid,方便标识不同的消费者
		String name = ManagementFactory.getRuntimeMXBean().getName();
		String pid = name.split("@")[0];
		// 创建连接和channel
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");

		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		
		channel.exchangeDeclare(EXCHANGE_NAME, "topic", false, true, null);
		// 由RabbitMQ自行创建的临时队列,唯一且随消费者的中止而自动删除的队列
		// String queueName = channel.queueDeclare().getQueue();
		
		// 或声明创建持久队列
		String queueName = ROUTING_KEY + ".queue";
		channel.queueDeclare(queueName, false, false, true, null);

		// binding
		channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);

		System.out.println(pid + "已经创建,正在等待消息...");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定队列消费者
		channel.basicConsume(queueName, true, consumer);

		while (true) {
			// Delivery : 封装了消息,消息的载体
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String recieveMsg = new String(delivery.getBody());
			System.out.println(pid + "接收到了消息: " + recieveMsg);
		}
	}
}

4、headers类型

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

fanout和headers类型都不需要路由键routingKey,交换时通过Headers头部来将消息映射到队列的,有点像HTTP的Headers,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)而是Object类型。

  • any: 只要在发布消息时携带的有一对键值对headers满足队列定义的多个参数arguments的其中一个就能匹配上,注意这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;
  • all:在发布消息时携带的所有Entry必须和绑定在队列上的所有Entry完全匹配

完整代码

生产者Producer

package test;
 
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
public class Producer {
 
	private final static String EXCHANGE_NAME = "exchange_headers";
  
	public static void main(String[] args) throws IOException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");
 
		// 创建connection
		Connection conn = factory.newConnection();
		// 创建channel
		Channel channel = conn.createChannel();
		
		Map<String, Object> heardersMap = new HashMap<String, Object>();
        heardersMap.put("api", "login");
        heardersMap.put("version", 1.0);
        heardersMap.put("radom", UUID.randomUUID().toString());
        BasicProperties.Builder properties = new BasicProperties().builder().headers(heardersMap);

		
		// 声明该channel是headers类型
		channel.exchangeDeclare(EXCHANGE_NAME, "headers");
		
		Date nowDate = new Date();
		String msg = nowDate.getTime() + " have log ...";
		// 将消息发送给exchange
		channel.basicPublish(EXCHANGE_NAME, "", properties.build(), msg.getBytes());
		System.out.println(nowDate + " 已经生成一条日志...");
		channel.close();
		conn.close();
	}
}

消费者Consumer

package test;
 
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
 
public class Consumer {
 
	private final static String EXCHANGE_NAME = "exchange_headers";
  
	public static void main(String[] args)
			throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 获取不同的pid,方便标识不同的消费者
		String name = ManagementFactory.getRuntimeMXBean().getName();
		String pid = name.split("@")[0];
		// 创建连接和channel
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
		factory.setVirtualHost("/");
 
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		
		channel.exchangeDeclare(EXCHANGE_NAME, "headers");
		// 由RabbitMQ自行创建的临时队列,唯一且随消费者的中止而自动删除的队列
		String queueName = channel.queueDeclare().getQueue();
        
		Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-match", "any");
        arguments.put("api", "login");
        arguments.put("version", 1.0);
        arguments.put("dataType", "json");

        // 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串""
        channel.queueBind(queueName, EXCHANGE_NAME, "", arguments);

		System.out.println(pid + "已经创建,正在等待消息...");
 
		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定队列消费者
		channel.basicConsume(queueName, true, consumer);
 
		while (true) {
			// Delivery : 封装了消息,消息的载体
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String recieveMsg = new String(delivery.getBody());
			System.out.println(pid + "接收到了消息: " + recieveMsg);
		}
	}
}

特殊说明

// all:匹配失败,缺少{"dataType", "json"}
Map<String, Object> heardersMap = new HashMap<String, Object>();
heardersMap.put("api", "login");
heardersMap.put("version", 1.0);

// all:匹配成功,生产者多发送一个head没关系
Map<String, Object> heardersMap = new HashMap<String, Object>();
heardersMap.put("api", "login");
heardersMap.put("version", 1.0);
heardersMap.put("dataType", "json");
heardersMap.put("ext", false);

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-match", "all");
arguments.put("api", "login");
arguments.put("version", 1.0);
arguments.put("dataType", "json");

//------------------------------------------
// any: 匹配成功,只要有一个键值对能满足队列的arguments即可
Map<String, Object> heardersMap = new HashMap<String, Object>();
heardersMap.put("api", "login");

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-match", "any");
arguments.put("api", "login");
arguments.put("version", 1.0);
arguments.put("dataType", "json");

// any: 匹配失败,键值对中的key和value必须全部匹配上
Map<String, Object> heardersMap = new HashMap<String, Object>();
heardersMap.put("api", "regist");

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-match", "any");
arguments.put("api", "login");
arguments.put("version", 1.0);
arguments.put("dataType", "json");

direct和headers类型的比较
绑定规则不同:direct是一个简单的String;而headers是键值对Entry,而且键值对的value可以是任意类型Object

绑定个数不同:direct一次只能绑定一个字符串,如果想绑定多个字符串就需要绑定多次或者循环调用queueBind()方法来绑定多次;而headers类型直接可以往Map中添加多个实体Entry即可

映射规则不同:direct只需要比较路由键是否相等即可,而headers类型除了比较value还要比较key,因为headers类型是Entry类型,需要同时比较key和value,而且headers类型还可以通过x-match来控制匹配的条件,all:需要匹配所有Entry,相当于SQL中的 and操作,any:只需要匹配上一个Entry即可,相当于SQL中的or操作

direct适用于计较简单的路由,而headers类型相比直连接匹配规则更强大

原文地址:https://www.cnblogs.com/gmhappy/p/11864033.html