SpringBoot整合ActiveMQ的publish/subscribe发布订阅模式(二)

(注意:当使用了发布订阅模式那么P2P模式就不能用了,解决的方法:)

(1)pom.xml完整的文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.qingfeng</groupId>
	<artifactId>springboot-ActiveMQ</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.1.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

			<!-- 热部署 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>

	</dependencies>


	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

(2)application.properties文件配置

  

server.port=8088
server.servlet.context-path=/springboot-ActiveMQ/
#不能用127.0.0.1,不然会报错
#spring.activemq.broker-url=tcp://127.0.0.1:61616 
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true 
spring.activemq.pool.enabled=false
#支持发布订阅模型,默认只支持点对点,使用了spring.jms.pub-sub-domain=true那么点对点模式就失效了
spring.jms.pub-sub-domain=true

  

(4)SpringBoot启动类

package com.qingfeng;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AppStart {

	public static void main(String[] args) {
		SpringApplication.run(AppStart.class, args);
	}
	
}

 

(5)发布者Publish类 

package com.qingfeng.producer;

import java.util.Map;

import javax.jms.Destination;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

/**
 * 发布者
 *
 */
@Service
public class Publish {
	
	// 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 
		@Autowired
		private JmsMessagingTemplate jmsMessagingTemplate;


		// 发送消息,destination是发送到的队列名称,message是待发送的消息  
		public void sendTopicMessage(Destination destination, String message){  
			jmsMessagingTemplate.convertAndSend(destination, message);  
		}  

		// 发送消息,destination是发送到的队列名称,map类型的发送的消息  
			public void sendTopicMapMessage(Destination destination, Map<String ,String> map){  
				jmsMessagingTemplate.convertAndSend(destination, map);  
			}  
	
	

}

  

(6)订阅者Subscribe类

package com.qingfeng.consumer;

import java.util.Map;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
 * 订阅者
 */
@Component
public class Subscribe {

	// 使用JmsListener配置消费者监听的topic名称qingfeng-topic,其中text是接收到的消息  
		@JmsListener(destination = "qingfeng-topic")  
		public void receiveTopic(String text) {  
			System.out.println("qingfeng-topic类型Subscribe收到的Publish的报文为:"+text);  
		}  
		
		@JmsListener(destination = "qingfeng-topic")  
		public void receiveTopic2(String text) {  
			System.out.println("qingfeng-topic类型Subscribe收到的Publish的报文为:"+text);  
		}  
		
		@JmsListener(destination = "qingfeng-topic")  
		public void receiveTopic3(String text) {  
			System.out.println("qingfeng-topic类型Subscribe收到的Publish的报文为:"+text);  
		}  

		// 使用JmsListener配置消费者监听的topic名称qingfeng-map-topic,其中map是接收到的消息 
		@JmsListener(destination = "qingfeng-map-topic")  
		public void receiveMapTopic(Map<String ,String> map) {  
			System.out.println("qingfeng-map-topic类型Subscribe收到的Publishmap类型的报文为:"+map);  
		}  
		
		@JmsListener(destination = "qingfeng-map-topic")  
		public void receiveMapTopic2(Map<String ,String> map) {  
			System.out.println("qingfeng-map-topic类型Subscribe收到的Publishmap类型的报文为:"+map);  
		}  

	
}

  

  

(7)测试类TopicController类

package com.qingfeng.controller;

import java.util.HashMap;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.qingfeng.producer.Publish;

@RestController
public class TopicController {
	
	@Autowired
	private Publish publish;
	
//  http://localhost:8088/springboot-ActiveMQ/topic/send?text=qingfeng
	@GetMapping("topic/send")
	public String  send(String text){
		//创建一个Topic名称为qingfeng-topic
		ActiveMQTopic activeMQTopic = new ActiveMQTopic("qingfeng-topic");
		//调用生产者产生消息
		publish.sendTopicMessage(activeMQTopic, text);
		return "发送成功";
	}
	
// http://localhost:8088/springboot-ActiveMQ/topic/sendMap
	@GetMapping("topic/sendMap")
	public String  sendMap(){
		//创建一个Topic名称为qingfeng-map-topic
		ActiveMQTopic activeMQTopic = new ActiveMQTopic("qingfeng-topic");
		HashMap<String, String> map = new HashMap<>();
		map.put("name", "qingfeng");
		map.put("age", "18");
		map.put("QQ", "37942135");
		//调用生产者产生消息
		publish.sendTopicMapMessage(activeMQTopic, map);
		return "发送成功";
	}

}

  

8)启动项目AppStart类

测试:http://localhost:8088/springboot-ActiveMQ/topic/send?text=qingfeng

 在控制台上输出

测试:http://localhost:8088/springboot-ActiveMQ/topic/sendMap

在控制台上输出

原文地址:https://www.cnblogs.com/Amywangqing/p/13636049.html