一、RabbitMQ的Exchange交换机介绍
1、RabbitMQ 的 Exchange 交换机
- 生产者将消息发送到 Exchange,交换器将消息路由到⼀个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系。
- 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失
- RabbitMQ有四种交换机类型,分别是Direct exchange、 Fanout exchange、 Topic exchange、Headers exchange,最后的基本不用
2、交换机类型
Direct Exchange 定向
- 将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配
- 例子:如果⼀个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb
- 处理路由键
Fanout Exchange 广播
- 只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了⼀份复制的消息
- Fanout交换机转发消息是最快的,用于发布订阅,广播形式,中文是扇形
- 不处理路由键
Topic Exchange 通配符
- 主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
- 将路由键和某模式进行匹配。此时队列需要绑定要⼀个模式上
- 符号“#”匹配⼀个或多个词,符号“*”匹配不多不少⼀个词
- 例子:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
Headers Exchanges(少用)
- 根据发送的消息内容中的headers属性进行匹配, 在绑定Queue与Exchange时指定⼀组键值对
- 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;
- 如果完全匹配则消息会路由到该队列,否则不会路由到该队列
- 不处理路由键
二、rabbitmq的发布订阅模式
1、rabbitmq的发布订阅模式
- 发布-订阅模型中,消息生产者不再是直接面对queue(队列名称),而是直面exchange,都需要经过exchange来进行消息的发送,所有发往同⼀个fanout交换机的消息都会被所有监听这个交换机的消费者接收到
- 发布订阅-消息模型引入fanout交换机
- 文档: https://www.rabbitmq.com/tutorials/tutorialthree-java.html
2、rabbitmq发布订阅模型
- 通过把消息发送给交换机,交互机转发给对应绑定的队列
- 交换机绑定的队列是排它独占队列,自动删除
三、代码实战
发送端
public class Send { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = "小滴课堂 rabbitmq 发布大课训练营综合项目"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("广播消息发送成功"); } } }
消费端,消费端的程序都是一样的
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, fanout交换机不用routingkey channel.queueBind(queueName, EXCHANGE_NAME, ""); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body=" + new String(body, "utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(), false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName, false, consumer); } }
验证:启动两个消费者,⼀个生产者发送消息