Rabbitmq与spring整合之重要组件介绍——AMQP声明式配置&RabbitTemplate组件

上一节是使用rabbitAdmin的管理组件进行声明队列,交换器,绑定等操作,本节则是采用AMQP声明式配置来声明这些东西。AMQP声明主要是通过@Bean注解进行的。

配置:

 1 package com.zxy.demo.config;
 2 
 3 import org.springframework.amqp.core.Binding;
 4 import org.springframework.amqp.core.BindingBuilder;
 5 import org.springframework.amqp.core.DirectExchange;
 6 import org.springframework.amqp.core.FanoutExchange;
 7 import org.springframework.amqp.core.Queue;
 8 import org.springframework.amqp.core.TopicExchange;
 9 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
10 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11 import org.springframework.amqp.rabbit.core.RabbitAdmin;
12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
13 import org.springframework.context.annotation.Bean;
14 import org.springframework.context.annotation.ComponentScan;
15 import org.springframework.context.annotation.Configuration;
16 
17 
18 @Configuration
19 @ComponentScan(basePackages= {"com.zxy.demo.*"})
20 public class RabbitmqCofing {
21 //    注入连接工厂,spring的配置,springboot可以配置在属性文件中
22     @Bean
23     public ConnectionFactory connectionFactory() {
24         CachingConnectionFactory connection = new CachingConnectionFactory();
25         connection.setAddresses("192.168.10.110:5672");
26         connection.setUsername("guest");
27         connection.setPassword("guest");
28         connection.setVirtualHost("/");
29         return connection;
30     }
31 //    配置RabbitAdmin来管理rabbit
32     @Bean
33     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
34         RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
35         //用RabbitAdmin一定要配置这个,spring加载的是后就会加载这个类================特别重要
36         rabbitAdmin.setAutoStartup(true);
37         return rabbitAdmin;
38     }
39 //===========================以上结合测试rabbitAdmin部分===========================================================    
40     
41     
42     
43 //===========================以下为AMQP配置队列绑定等,spring容器加载时候就能够注入===========================================================    
44 //    采用AMQP定义队列、交换器、绑定等
45     @Bean(name="direct.queue01")
46     public Queue queue001() {
47         return new Queue("direct.queue01", true, false, false);
48     }
49     @Bean(name="test.direct01")
50     public DirectExchange directExchange() {
51         return new DirectExchange("test.direct01", true, false, null);
52     }
53     @Bean
54     public Binding bind001() {
55         return BindingBuilder.bind(queue001()).to(directExchange()).with("mq.#");
56     }
57     @Bean(name="topic.queue01")
58     public Queue queue002() {
59         return new Queue("topic.queue01", true, false, false);
60     }
61     @Bean(name="test.topic01")
62     public TopicExchange topicExchange() {
63         return new TopicExchange("test.topic01", true, false, null);
64     }
65     @Bean
66     public Binding bind002() {
67         return BindingBuilder.bind(queue002()).to(topicExchange()).with("mq.topic");
68     }
69     @Bean(name="fanout.queue01")
70     public Queue queue003() {
71         return new Queue("fanout.queue", true, false, false);
72     }
73     @Bean(name="test.fanout01")
74     public FanoutExchange fanoutExchange() {
75         return new FanoutExchange("test.fanout01", true, false, null);
76     }
77     @Bean
78     public Binding bind003() {
79         return BindingBuilder.bind(queue003()).to(fanoutExchange());
80     }
81     
82     
83     
84 //===========================注入rabbitTemplate组件===========================================================    
85 //    跟spring整合注入改模板,跟springboot整合的话只需要在配置文件中配置即可
86     @Bean 
87     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
88         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
89         return rabbitTemplate;
90     }
91 }

单元测试:

 1 package com.zxy.demo;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.amqp.AmqpException;
 6 import org.springframework.amqp.core.Binding;
 7 import org.springframework.amqp.core.BindingBuilder;
 8 import org.springframework.amqp.core.DirectExchange;
 9 import org.springframework.amqp.core.FanoutExchange;
10 import org.springframework.amqp.core.Message;
11 import org.springframework.amqp.core.MessageDeliveryMode;
12 import org.springframework.amqp.core.MessagePostProcessor;
13 import org.springframework.amqp.core.MessageProperties;
14 import org.springframework.amqp.core.Queue;
15 import org.springframework.amqp.core.TopicExchange;
16 import org.springframework.amqp.rabbit.core.RabbitAdmin;
17 import org.springframework.amqp.rabbit.core.RabbitTemplate;
18 import org.springframework.beans.factory.annotation.Autowired;
19 import org.springframework.boot.test.context.SpringBootTest;
20 import org.springframework.test.context.junit4.SpringRunner;
21 
22 
23 @RunWith(SpringRunner.class)
24 @SpringBootTest
25 public class ForwardApplicationTests {
26 
27     @Test
28     public void contextLoads() {
29     }
30     @Autowired
31     private RabbitAdmin rabbitAdmin;
32     @Test
33     public void testAdmin() {
34 //        切记命名不能重复复
35         rabbitAdmin.declareQueue(new Queue("test.direct.queue"));
36         rabbitAdmin.declareExchange(new DirectExchange("test.direct"));
37         rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "mq.direct", null));
38 
39         rabbitAdmin.declareQueue(new Queue("test.topic.queue", true,false, false));
40         rabbitAdmin.declareExchange(new TopicExchange("test.topic", true,false));
41 //        如果注释掉上面两句实现声明,直接进行下面的绑定竟然不行,该版本amqp-client采用的是5.1.2,将上面两行代码放开,则运行成功
42         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue", true,false, false))
43                 .to(new TopicExchange("test.topic", true,false)).with("mq.topic"));
44 //        经过实验确实是需要先声明,才可以运行通过
45         rabbitAdmin.declareQueue(new Queue("test.fanout.queue",true,false,false,null));
46         rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", true, false, null));
47         rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.fanout.queue", true, false,false))
48                 .to(new FanoutExchange("test.fanout", true, false)));
49         rabbitAdmin.purgeQueue("test.direct.queue", false);//清空队列消息
50     }
51     @Autowired
52     private RabbitTemplate rabbitTemplate;
53     @Test
54     public void testTemplate() {
55         String body = "hello,test rabbitTemplage!";
56         MessageProperties properties = new MessageProperties();
57         properties.setContentEncoding("utf-8");
58         properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
59         properties.setPriority(1);
60         properties.setHeader("nihao:", "yes!");
61         Message message = new Message(body.getBytes(), properties);
62 //        MessagePostProcessor参数是在消息发送过程中动态修改消息属性的类
63         rabbitTemplate.convertAndSend("test.direct01", "mq.direct", message,new MessagePostProcessor() {
64             
65             @Override
66             public Message postProcessMessage(Message message) throws AmqpException {
67 //                修改属性
68                 message.getMessageProperties().setHeader("nihao:", "no");
69 //                添加属性
70                 message.getMessageProperties().setHeader("新添加属性:", "添加属性1");
71                 return message;
72             }
73         });
74         
75         
76 //        发送objcet类型
77         rabbitTemplate.convertAndSend("test.topic01", "mq.topic", "send object type message!!!");
78         System.out.println("发送完毕!!!");
79     }
80 
81 }
原文地址:https://www.cnblogs.com/xiaoyao-001/p/9609900.html