RabbitMQ(Exchange交换机详解)(四)

Exchange:接收消息,并根据路由键转发消息所绑定的队列

ClientA,B将消息投递到交换机Exchange上,通过路由关系,投递到指定的queue1或者queue2上,通过监听投递到Client1...

交换机属性:

  Name:交换机名称

  Type:交换机类型direct、topic、fanout、headers

  Durability:是否需要持久化,true为持久化

  Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange

  Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false

  Arguments:扩展参数,用于扩展AMQP协议自制定化使用

有直连对应关系的连接方式

Direct Exchange:所有发送到Direct Exchange的消息被转发到RoutKey中指定的Queue

        注:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,

          所以不需要将Exchange进行任何绑定(binding)操作,消息加投递时,

          RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

routing key要和 queues中的key的值相同,才可以路由到

       

 1         //生产端代码
 2 
 3         ConnectionFactory connectionFactory = new ConnectionFactory();
 4         connectionFactory.setHost("127.0.0.1");
 5         connectionFactory.setPort(5672);
 6         connectionFactory.setVirtualHost("/");
 7         connectionFactory.setUsername("guest");
 8         connectionFactory.setPassword("guest");
 9 
10         Connection connection = connectionFactory.newConnection();
11 
12         Channel channel = connection.createChannel();
13 
14         String exchangeName = "test_direct_exchange";
15         String routingKey = "test.direct";
16 
17         String str = "hello world";
18         channel.basicPublish(exchangeName, routingKey, null, str.getBytes());
 1        //消费端代码
 2 
 3        ConnectionFactory connectionFactory = new ConnectionFactory() ;
 4 
 5         connectionFactory.setHost("127.0.0.1");
 6         connectionFactory.setPort(5672);
 7         connectionFactory.setVirtualHost("/");
 8 
 9         connectionFactory.setAutomaticRecoveryEnabled(true);
10         connectionFactory.setNetworkRecoveryInterval(3000);
11         Connection connection = connectionFactory.newConnection();
12 
13         Channel channel = connection.createChannel();
14         //4 声明
15         String exchangeName = "test_direct_exchange";
16         String exchangeType = "direct";
17         String queueName = "test_direct_queue";
18         String routingKey = "test.direct";
19 
20         //表示声明了一个交换机
21         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
22         //表示声明了一个队列
23         channel.queueDeclare(queueName, false, false, false, null);
24         //建立一个绑定关系:
25         channel.queueBind(queueName, exchangeName, routingKey);
26 
27         //durable 是否持久化消息
28         QueueingConsumer consumer = new QueueingConsumer(channel);
29         //参数:队列名称、是否自动ACK、Consumer
30         channel.basicConsume(queueName, true, consumer);
31         //循环获取消息
32         while(true){
33             //获取消息,如果没有消息,这一步将会一直阻塞
34             Delivery delivery = consumer.nextDelivery();
35             String msg = new String(delivery.getBody());
36             System.out.println("收到消息:" + msg);
37         }        

有路由规则,可模糊匹配的连接方式

Topic Exchange:所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定的Topic的Queue上

        Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

        注:可以使用通配符进行模糊匹配

          符号“#”匹配一个或多个词

          符号"*"匹配一个词

          例如:“log.#”能够匹配到“log.info.oa”

              "log.*"只会匹配到"log.erro"

     

 1  //生产端代码
 2  //1 创建ConnectionFactory
 3         ConnectionFactory connectionFactory = new ConnectionFactory();
 4         connectionFactory.setHost("127.0.0.1");
 5         connectionFactory.setPort(5672);
 6         connectionFactory.setVirtualHost("/");
 7         
 8         //2 创建Connection
 9         Connection connection = connectionFactory.newConnection();
10         //3 创建Channel
11         Channel channel = connection.createChannel();  
12         //4 声明
13         String exchangeName = "test_topic_exchange";
14         String routingKey1 = "user.save";
15         String routingKey2 = "user.update";
16         String routingKey3 = "user.delete.abc";
17         //5 发送
18         
19         String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
20         channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
21         channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
22         channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
23         channel.close();  
24         connection.close();  
 1       //消费端代码
 2        ConnectionFactory connectionFactory = new ConnectionFactory() ;  
 3         
 4         connectionFactory.setHost("127.0.0.1");
 5         connectionFactory.setPort(5672);
 6         connectionFactory.setVirtualHost("/");
 7         
 8         connectionFactory.setAutomaticRecoveryEnabled(true);
 9         connectionFactory.setNetworkRecoveryInterval(3000);
10         Connection connection = connectionFactory.newConnection();
11         
12         Channel channel = connection.createChannel();  
13         //4 声明
14         String exchangeName = "test_topic_exchange";
15         String exchangeType = "topic";
16         String queueName = "test_topic_queue";
17        
              //只能接收到前两条消息
             //String routingKey = "user.*";

             //可以接收到三条消息
18         String routingKey = "user.#";
19         // 1 声明交换机 
20         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
21         // 2 声明队列
22         channel.queueDeclare(queueName, false, false, false, null);
23         // 3 建立交换机和队列的绑定关系:
24         channel.queueBind(queueName, exchangeName, routingKey);
25         
26         //durable 是否持久化消息
27         QueueingConsumer consumer = new QueueingConsumer(channel);
28         //参数:队列名称、是否自动ACK、Consumer
29         channel.basicConsume(queueName, true, consumer);  
30         //循环获取消息  
31         while(true){  
32             //获取消息,如果没有消息,这一步将会一直阻塞  
33             Delivery delivery = consumer.nextDelivery();  
34             String msg = new String(delivery.getBody());    
35             System.out.println("收到消息:" + msg);  
36         }                         

不做任何路由的连接方式

Fanout  Exchange:不处理路由键,只需简单的将队列绑定到交换机上

          发送到交换机的消息都会被转发到与该交换机绑定的队列上

             fanout交换机转发消息是最快的

      

 1              //生产者代码
 2 
 3               //1 创建ConnectionFactory
 4         ConnectionFactory connectionFactory = new ConnectionFactory();
 5         connectionFactory.setHost("127.0.0.1");
 6         connectionFactory.setPort(5672);
 7         connectionFactory.setVirtualHost("/");
 8         
 9         //2 创建Connection
10         Connection connection = connectionFactory.newConnection();
11         //3 创建Channel
12         Channel channel = connection.createChannel();  
13         //4 声明
14         String exchangeName = "test_fanout_exchange";
15         //5 发送
16         for(int i = 0; i < 10; i ++) {
17             String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
18             channel.basicPublish(exchangeName, "", null , msg.getBytes());             
19         }
20         channel.close();  
21         connection.close(); 
 1     //消费者端代码
 2  
 3     ConnectionFactory connectionFactory = new ConnectionFactory() ;  
 4         
 5         connectionFactory.setHost("127.0.0.1");
 6         connectionFactory.setPort(5672);
 7         connectionFactory.setVirtualHost("/");
 8         
 9         connectionFactory.setAutomaticRecoveryEnabled(true);
10         connectionFactory.setNetworkRecoveryInterval(3000);
11         Connection connection = connectionFactory.newConnection();
12         
13         Channel channel = connection.createChannel();  
14         //4 声明
15         String exchangeName = "test_fanout_exchange";
16         String exchangeType = "fanout";
17         String queueName = "test_fanout_queue";
18         String routingKey = "";    //不设置路由键
19         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
20         channel.queueDeclare(queueName, false, false, false, null);
21         channel.queueBind(queueName, exchangeName, routingKey);
22         
23         //durable 是否持久化消息
24         QueueingConsumer consumer = new QueueingConsumer(channel);
25         //参数:队列名称、是否自动ACK、Consumer
26         channel.basicConsume(queueName, true, consumer); 
27         //循环获取消息  
28         while(true){  
29             //获取消息,如果没有消息,这一步将会一直阻塞  
30             Delivery delivery = consumer.nextDelivery();  
31             String msg = new String(delivery.getBody());    
32             System.out.println("收到消息:" + msg);  
33         }     

Headers Exchange:不常用,使用消息头来进行路由的

原文地址:https://www.cnblogs.com/luhan777/p/11162306.html