SpringBoot整合ActiveMQ消息组件

1、ActiveMQ是Apache提供的开源组件,是基于JMS标准的实现组件。利用SpringBoot整合ActiveMQ组件,实现队列消息的发送与接收。修改pom.xml配置文件,追加spring-boot-starter-activemq依赖库。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
 6     <modelVersion>4.0.0</modelVersion>
 7     <parent>
 8         <groupId>org.springframework.boot</groupId>
 9         <artifactId>spring-boot-starter-parent</artifactId>
10         <version>2.3.5.RELEASE</version>
11         <relativePath /> <!-- lookup parent from repository -->
12     </parent>
13     <groupId>com.example</groupId>
14     <artifactId>demo</artifactId>
15     <version>0.0.1-SNAPSHOT</version>
16     <name>demo</name>
17     <description>Demo project for Spring Boot</description>
18 
19     <properties>
20         <java.version>1.8</java.version>
21         <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22     </properties>
23 
24     <dependencies>
25         <dependency>
26             <groupId>org.springframework.boot</groupId>
27             <artifactId>spring-boot-starter-web</artifactId>
28         </dependency>
29 
30         <dependency>
31             <groupId>org.springframework.boot</groupId>
32             <artifactId>spring-boot-starter-test</artifactId>
33             <scope>test</scope>
34             <exclusions>
35                 <exclusion>
36                     <groupId>org.junit.vintage</groupId>
37                     <artifactId>junit-vintage-engine</artifactId>
38                 </exclusion>
39             </exclusions>
40         </dependency>
41 
42         <!-- mysql驱动包 -->
43         <dependency>
44             <groupId>mysql</groupId>
45             <artifactId>mysql-connector-java</artifactId>
46         </dependency>
47 
48         <!-- druid连接池 -->
49         <dependency>
50             <groupId>com.alibaba</groupId>
51             <artifactId>druid</artifactId>
52             <version>1.1.10</version>
53         </dependency>
54 
55         <dependency>
56             <groupId>org.springframework.boot</groupId>
57             <artifactId>spring-boot-starter-data-jpa</artifactId>
58         </dependency>
59         <dependency>
60             <groupId>org.springframework.boot</groupId>
61             <artifactId>spring-boot-starter-cache</artifactId>
62         </dependency>
63         <dependency>
64             <groupId>org.hibernate</groupId>
65             <artifactId>hibernate-ehcache</artifactId>
66         </dependency>
67 
68         <!-- activeMQ -->
69         <dependency>
70             <groupId>org.springframework.boot</groupId>
71             <artifactId>spring-boot-starter-activemq</artifactId>
72         </dependency>
73     </dependencies>
74 
75     <build>
76         <plugins>
77             <plugin>
78                 <groupId>org.springframework.boot</groupId>
79                 <artifactId>spring-boot-maven-plugin</artifactId>
80             </plugin>
81         </plugins>
82         <resources>
83             <resource>
84                 <directory>src/main/resources</directory>
85                 <includes>
86                     <include>**/*.properties</include>
87                     <include>**/*.yml</include>
88                     <include>**/*.xml</include>
89                     <include>**/*.p12</include>
90                     <include>**/*.html</include>
91                     <include>**/*.jpg</include>
92                     <include>**/*.png</include>
93                 </includes>
94             </resource>
95         </resources>
96     </build>
97 
98 </project>

修改application.yml配置文件,进行ActiveMQ的配置,如下所示:

1 # 配置消息类型,true表示为topic消息,false表示Queue消息
2 spring.jms.pub-sub-domain=false
3 # 连接的用户名
4 spring.activemq.user=admin
5 # 密码
6 spring.activemq.password=admin
7 # 消息组件的连接主机信息
8 spring.activemq.broker-url=tcp://192.168.110.142:61616

定义消息消费监听类,如下所示:

 1 package com.demo.consumer;
 2 
 3 import org.springframework.jms.annotation.JmsListener;
 4 import org.springframework.stereotype.Service;
 5 
 6 @Service
 7 public class MessageConsumer {
 8 
 9     /**
10      * 
11      * @param text
12      */
13     @JmsListener(destination = "msg.queue") // 定义消息监听队列
14     public void receiveMessage(String text) {
15         // 进行消息接受处理
16         System.err.println("【*** 接受消息 ***】" + text);
17     }
18 }

定义消息生产者业务类,如下所示:

 1 package com.demo.producer;
 2 
 3 import javax.jms.Queue;
 4 
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.jms.core.JmsMessagingTemplate;
 7 import org.springframework.stereotype.Service;
 8 
 9 /**
10  * 
11  * @author 消息发送
12  *
13  */
14 @Service
15 public class MessageProducer {
16 
17     // 消息发送模板
18     @Autowired
19     private JmsMessagingTemplate jmsMessagingTemplate;
20 
21     // 注入队列
22     @Autowired
23     private Queue queue;
24 
25     /**
26      * 发送消息
27      */
28     public void send(String msg) {
29         this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
30     }
31 
32 }

定义JMS消息发送配置类,该类主要用于配置队列信息,如下所示:

 1 package com.demo.config;
 2 
 3 import javax.jms.Queue;
 4 
 5 import org.apache.activemq.command.ActiveMQQueue;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.Configuration;
 8 import org.springframework.jms.annotation.EnableJms;
 9 
10 @Configuration
11 @EnableJms
12 public class ActiveMqConfig {
13 
14     @Bean
15     public Queue queue() {
16         ActiveMQQueue activeMQQueue = new ActiveMQQueue("msg.queue");
17         return activeMQQueue;
18     }
19 }

使用ActiveMQ实现了消息的发送与接收处理。每当有消息接收到时,都会自动执行MessageConsumer类,进行消息消费。

 1 package com.demo.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.demo.producer.MessageProducer;
 9 
10 @Controller
11 public class ActiveMqController {
12 
13     @Autowired
14     private MessageProducer messageProducer;
15 
16     @RequestMapping(value = "/messageProducer")
17     @ResponseBody
18     public void findAll() {
19         for (int i = 0; i < 10000; i++) {
20             messageProducer.send("active producer message : " + i);
21         }
22     }
23 
24 }

在浏览器或者可以执行命令的地方执行,http://127.0.0.1:8080/messageProducer,可以在activemq的监控地址进行观察http://192.168.110.142:8161/admin/queues.jsp

原文地址:https://www.cnblogs.com/biehongli/p/13986703.html