RabbitMQ——订阅模式类型/发布与订阅模式


一、订阅模式

订阅模式示例图:

 订阅模型中,多了一个exchange角色:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

    Fanout:广播,将消息交给所有绑定到交换机的队列
    Direct:定向,把消息交给符合指定routing key 的队列
    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失! 

二、Publish/Subscribe发布与订阅模式

1.模式说明

 发布订阅模式:
(1)每个消费者监听自己的队列。
(2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息 

2、示例

Producer_PubSub:
 1 /**
 2  * 发送消息
 3  */
 4 public class Producer_PubSub {
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6 
 7         //1.创建连接工厂
 8         ConnectionFactory factory = new ConnectionFactory();
 9         //2. 设置参数
10         factory.setHost("172.16.98.133");//ip  默认值 localhost
11         factory.setPort(5672); //端口  默认值 5672
12         factory.setVirtualHost("/itcast");//虚拟机 默认值/
13         factory.setUsername("jingdong");//用户名 默认 guest
14         factory.setPassword("jingdong");//密码 默认值 guest
15         //3. 创建连接 Connection
16         Connection connection = factory.newConnection();
17         //4. 创建Channel
18         Channel channel = connection.createChannel();
19        /*
20 
21        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
22        参数:
23         1. exchange:交换机名称
24         2. type:交换机类型
25             DIRECT("direct"),:定向,把消息交给符合指定routing key 的队列。
26             FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
27             TOPIC("topic"),通配符的方式
28             HEADERS("headers");参数匹配
29 
30         3. durable:是否持久化
31         4. autoDelete:自动删除
32         5. internal:内部使用。 一般false
33         6. arguments:参数
34         */
35 
36        String exchangeName = "test_fanout";
37         //5. 创建交换机
38         channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
39         //6. 创建队列
40         String queue1Name = "test_fanout_queue1";
41         String queue2Name = "test_fanout_queue2";
42         channel.queueDeclare(queue1Name,true,false,false,null);
43         channel.queueDeclare(queue2Name,true,false,false,null);
44         //7. 绑定队列和交换机
45         /*
46         queueBind(String queue, String exchange, String routingKey)
47         参数:
48             1. queue:队列名称
49             2. exchange:交换机名称
50             3. routingKey:路由键,绑定规则
51                 如果交换机的类型为fanout ,routingKey设置为""
52          */
53         channel.queueBind(queue1Name,exchangeName,"");
54         channel.queueBind(queue2Name,exchangeName,"");
55 
56         String body = "日志信息:张三调用了findAll方法...日志级别:info...";
57         //8. 发送消息
58         channel.basicPublish(exchangeName,"",null,body.getBytes());
59 
60         //9. 释放资源
61         channel.close();
62         connection.close();
63 
64     }
65 }
View Code
Consumer_PubSub1:
 1 public class Consumer_PubSub1 {
 2     public static void main(String[] args) throws IOException, TimeoutException {
 3 
 4         //1.创建连接工厂
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //2. 设置参数
 7         factory.setHost("172.16.98.133");//ip  默认值 localhost
 8         factory.setPort(5672); //端口  默认值 5672
 9         factory.setVirtualHost("/itcast");//虚拟机 默认值/
10         factory.setUsername("jingdong");//用户名 默认 guest
11         factory.setPassword("jingdong");//密码 默认值 guest
12         //3. 创建连接 Connection
13         Connection connection = factory.newConnection();
14         //4. 创建Channel
15         Channel channel = connection.createChannel();
16 
17 
18         String queue1Name = "test_fanout_queue1";
19         String queue2Name = "test_fanout_queue2";
20 
21 
22         /*
23         basicConsume(String queue, boolean autoAck, Consumer callback)
24         参数:
25             1. queue:队列名称
26             2. autoAck:是否自动确认
27             3. callback:回调对象
28 
29          */
30         // 接收消息
31         Consumer consumer = new DefaultConsumer(channel){
32             /*
33                 回调方法,当收到消息后,会自动执行该方法
34 
35                 1. consumerTag:标识
36                 2. envelope:获取一些信息,交换机,路由key...
37                 3. properties:配置信息
38                 4. body:数据
39 
40              */
41             @Override
42             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
43               /*  System.out.println("consumerTag:"+consumerTag);
44                 System.out.println("Exchange:"+envelope.getExchange());
45                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
46                 System.out.println("properties:"+properties);*/
47                 System.out.println("body:"+new String(body));
48                 System.out.println("将日志信息打印到控制台.....");
49             }
50         };
51         channel.basicConsume(queue1Name,true,consumer);
52 
53 
54         //关闭资源?不要
55 
56     }
57 }
View Code
Consumer_PubSub2:
 1 public class Consumer_PubSub2 {
 2     public static void main(String[] args) throws IOException, TimeoutException {
 3 
 4         //1.创建连接工厂
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //2. 设置参数
 7         factory.setHost("172.16.98.133");//ip  默认值 localhost
 8         factory.setPort(5672); //端口  默认值 5672
 9         factory.setVirtualHost("/itcast");//虚拟机 默认值/
10         factory.setUsername("jingdong");//用户名 默认 guest
11         factory.setPassword("jingdong");//密码 默认值 guest
12         //3. 创建连接 Connection
13         Connection connection = factory.newConnection();
14         //4. 创建Channel
15         Channel channel = connection.createChannel();
16 
17 
18         String queue1Name = "test_fanout_queue1";
19         String queue2Name = "test_fanout_queue2";
20 
21 
22         /*
23         basicConsume(String queue, boolean autoAck, Consumer callback)
24         参数:
25             1. queue:队列名称
26             2. autoAck:是否自动确认
27             3. callback:回调对象
28 
29          */
30         // 接收消息
31         Consumer consumer = new DefaultConsumer(channel){
32             /*
33                 回调方法,当收到消息后,会自动执行该方法
34 
35                 1. consumerTag:标识
36                 2. envelope:获取一些信息,交换机,路由key...
37                 3. properties:配置信息
38                 4. body:数据
39 
40              */
41             @Override
42             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
43               /*  System.out.println("consumerTag:"+consumerTag);
44                 System.out.println("Exchange:"+envelope.getExchange());
45                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
46                 System.out.println("properties:"+properties);*/
47                 System.out.println("body:"+new String(body));
48                 System.out.println("将日志信息保存数据库.....");
49             }
50         };
51         channel.basicConsume(queue2Name,true,consumer);
52 
53 
54         //关闭资源?不要
55 
56     }
57 }
View Code
原文地址:https://www.cnblogs.com/aaaazzzz/p/12827425.html