springboot 整合 ActiveMQ

springboot 整合 ActiveMQ

pom 坐标

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <!-- 整合 activemq -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <!-- 这里我使用的是测试发送消息 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

yml 文件

server:
  port: 8080
spring:
  activemq:
    broker-url: tcp://localhost:61616
    packages:
      trusted: com.mozq.activemq.domain,com.mozq.activemq.entity,java.util
#      trust-all: true

消息生产者

使用 springboot 测试类实现

ObjectMessage 被发送的对象必须实现序列化。如果嵌套,都要实现序列化。Customer 对象有个成员是 LinkMan 对象,则 Customer 和 LinkMan 必须都实现序列化。发送集合对象。List<User> List 实现了序列化,其中的元素 User 也必须实现序列化。

package com.mozq.activemq;

import com.mozq.activemq.domain.Customer;
import com.mozq.activemq.domain.LinkMan;
import com.mozq.activemq.domain.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import javax.jms.*;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.*;

@RunWith(SpringRunner.class)
@SpringBootTest(classes= SmsApplication.class)
public class DemoTest {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * TextMessage 常用,可以处理字符串和对象。如果是对象需要转为 json 字符串。
     */
    @Test
    public void sendText(){
        /*
        String message = "这是测试短信";
        jmsTemplate.convertAndSend("mozq.queue.text", message);
        */
        jmsTemplate.send("mozq.queue.text", session -> {
            TextMessage textMessage = session.createTextMessage("这是测试短信");
            return textMessage;
        });
    }

    /**
     * ObjectMessage 不安全,也挺常用的。
     */
    @Test
    public void sendObject(){
        List<String> strList = new ArrayList<>();
        strList.add("长河");
        strList.add("落日");

        jmsTemplate.convertAndSend("mozq.queue.obj", strList);
        /*
        jmsTemplate.send("mozq.queue.obj", session -> {
            ObjectMessage objectMessage = session.createObjectMessage((Serializable) strList);
            return objectMessage;
        });
        */
    }

    @Test
    public void sendObject2(){
        User user = new User();
        user.setName("曹操");
        user.setAge(20);
        jmsTemplate.send("mozq.queue.user", session -> {
            ObjectMessage objectMessage = session.createObjectMessage((Serializable) user);
            return objectMessage;
        });
    }

    /**
     * Customer 对象中有成员 LinkMan 对象。则 Customer 和 LinkMan 必须都实现序列化。
     */
    @Test
    public void sendObject3(){
        LinkMan linkMan = new LinkMan();
        linkMan.setName("关羽");
        Customer customer = new Customer();
        customer.setName("诸葛");
        customer.setLinkMan(linkMan);

        jmsTemplate.send("mozq.queue.customer", session -> {
            ObjectMessage objectMessage = session.createObjectMessage((Serializable) customer);
            return objectMessage;
        });
    }

    @Test
    public void sendMap(){
        Map<String, Object> user = new HashMap<>();
        user.put("name", "刘备");
        user.put("age", 18);
        jmsTemplate.convertAndSend("mozq.queue.map", user);

//        jmsTemplate.send("mozq.queue.map", session -> {
//            MapMessage mapMessage = session.createMapMessage();
//            mapMessage.setObject("name", "刘备");
//            mapMessage.setInt("age", 18);
//            return mapMessage;
//        });
    }

    /**
     * 特点提供了各种数据类型读写方法,读取顺序必须和写入顺序一致
     */
    @Test
    public void testBytes(){

        jmsTemplate.send("mozq.queue.bytes", session -> {
            try {
                BytesMessage bytesMessage = session.createBytesMessage();
                bytesMessage.writeLong(100L);
                bytesMessage.writeBytes("字节流".getBytes("UTF-8"));
                return bytesMessage;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        });

    }

    /**
     * 特点提供了各种数据类型读写方法,读取顺序必须和写入顺序一致
     */
    @Test
    public void testStream(){
        jmsTemplate.send("mozq.queue.stream", session -> {
            try {
                StreamMessage streamMessage = session.createStreamMessage();
                streamMessage.writeString("孙权");
                streamMessage.writeString("大乔");
                streamMessage.writeBytes("赤壁".getBytes("UTF-8"));
                return streamMessage;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            return null;
        });
    }

}

消息消费者

package com.mozq.activemq.listener;

import com.mozq.activemq.domain.Customer;
import com.mozq.activemq.domain.User;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.List;

@Component
public class JmsQueueListener {

    @JmsListener(destination = "mozq.queue.text")
    public void receiveQueue(TextMessage textMessage){
        try {
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.obj")
    public void receiveQueue(ObjectMessage objectMessage){
        try {
            List<String>  list = (List<String>) objectMessage.getObject();
            System.out.println(list);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.user")
    public void receiveQueue2(ObjectMessage objectMessage){
        try {
            User user = (User) objectMessage.getObject();
            System.out.println(user);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.customer")
    public void receiveQueue3(ObjectMessage objectMessage){
        try {
            Customer customer = (Customer) objectMessage.getObject();
            System.out.println(customer);//Customer{name='诸葛', linkMan=LinkMan{name='关羽'}}
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @JmsListener(destination = "mozq.queue.map")
    public void receiveQueue(MapMessage mapMessage){
        try {
            Enumeration mapNames = mapMessage.getMapNames();
            while(mapNames.hasMoreElements()){
                Object key = mapNames.nextElement();
                Object value = mapMessage.getObject((String) key);
                System.out.println(key + ":" + value);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * BytesMessage 特点提供了各种数据类型读写方法,读取顺序必须和写入顺序一致
     * @param bytesMessage
     */
    @JmsListener(destination = "mozq.queue.bytes")
    public void receiveQueue(BytesMessage bytesMessage){
        try {
            long num = bytesMessage.readLong();
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bytes);
            String text = new String(bytes, "UTF-8");
            System.out.println(num + ":" + text);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * StreamMessage 特点提供了各种数据类型读写方法,读取顺序必须和写入顺序一致
     * @param streamMessage
     */
    @JmsListener(destination = "mozq.queue.stream")
    public void receiveQueue(StreamMessage streamMessage){
        try {
            String name = streamMessage.readString();
            String wife = streamMessage.readString();
            byte[] bytes = new byte[1024];
            streamMessage.readBytes(bytes);
            String text = new String(bytes, "UTF-8");
            System.out.println(name + ":" + wife + ":" + text);
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

    }

}

实体类 User Customer LinkMan

public class User implements Serializable {
    private  String name;
    private  int age;
}
public class Customer implements Serializable {
    private String name;
    private LinkMan linkMan;
}
// 如果 LinkMan 不实现序列化,用 ObjectMessage 发送 Customer 对象会报错。
public class LinkMan implements Serializable {
    private String name;
}

bugs

java.lang.ClassCastException: com.mozq.activemq.domain.User cannot be cast to java.io.Serializable
错误代码:
ObjectMessage objectMessage = session.createObjectMessage((Serializable) user);
原因:
	对象消息,发送的对象必须实现序列化接口。
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mozq.activemq.domain.User! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.
错误代码:
	消息消费者中,接收到消息报错。
原因:
	http://activemq.apache.org/objectmessage.html
	ObjectMessage 对象基于 Java 序列化机制,这个过程是不安全的。这就是为什么从 5.12.2 和 5.13.0 开始,ActiveMQ 强制用户必须显式指定可以使用 ObjectMessage 的包。
Security
ObjectMessage objects depend on Java serialization of marshal/unmarshal object payload. This process is generally considered unsafe as malicious payload can exploit the host system. That’s why starting with versions 5.12.2 and 5.13.0, ActiveMQ enforces users to explicitly whitelist packages that can be exchanged using ObjectMessages.

ActiveMQConnectionFactory. There are two additional methods defined:

  • The setTrustedPackages() method allows you to set the list of trusted packages you want to be to unserialize, like

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
    
  • The setTrustAllPackages() allows you to turn off security check and trust all classes. It’s useful for testing purposes.

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    factory.setTrustAllPackages(true);
    

如果使用的是配置文件,在 ActiveMQConnectionFactory 中加入参数

<property name="trustAllPackages" value="true"/>

如果使用的是 application.properties

spring:
  activemq:
    broker-url: tcp://localhost:61616
    packages:
      trusted: com.mozq.activemq.domain,com.mozq.activemq.entity
#      trust-all: true
查看官方 application.properties 属性说明
https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/common-application-properties.html

spring.activemq.packages.trust-all= # Whether to trust all packages. 布尔值。
spring.activemq.packages.trusted= # Comma-separated list of specific packages to trust (when not trusting all packages). 逗号分隔的包名列表。
public class ActiveMQConnectionFactory{
    public void setTrustedPackages(List<String> trustedPackages) {
        this.trustedPackages = trustedPackages;
    }

    public void setTrustAllPackages(boolean trustAllPackages) {
        this.trustAllPackages = trustAllPackages;
    }
}
public class ActiveMQProperties { 
    private final ActiveMQProperties.Packages packages = new ActiveMQProperties.Packages();
    
    public static class Packages {
        private Boolean trustAll;
        private List<String> trusted = new ArrayList();

        public void setTrustAll(Boolean trustAll) {
            this.trustAll = trustAll;
        }

        public void setTrusted(List<String> trusted) {
            this.trusted = trusted;
        }
    }
}
Caused by: java.io.NotSerializableException: com.mozq.activemq.domain.LinkMan
原因:
	使用 ObjectMessage ,如果被发送的对象的引用链上的所有对象都必须实现序列化。Customer 对象中含有 LinkMan 对象,引用链有2个对象,只有 Customer 对象实现了序列化,而 LinkMan 对象没有实现序列化,报错。必须都实现序列化。

文章

MS五种消息的发送/接收的例子
https://chenjumin.iteye.com/blog/687124
application.properties 属性
https://docs.spring.io/spring-boot/docs/2.1.7.RELEASE/reference/html/common-application-properties.html

ObjectMessage 消息的安全性

http://activemq.apache.org/objectmessage.html

关于MQ,你必须知道的 (大牛)

https://www.cnblogs.com/zhuoqingsen/p/MQ.html

springboot与ActiveMQ整合

https://www.cnblogs.com/elvinle/p/8457596.html

第十七章:springboot 整合 activeMQ

https://my.oschina.net/u/3387320/blog/3009301

原文地址:https://www.cnblogs.com/mozq/p/11329012.html