spring集成RabbitMQ

1.添加 maven 项目依赖

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.3.5.RELEASE</version>
</dependency>

2.添加 spring-rabbitmq.xml 配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- 连接服务配置,如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码,guest默认不允许远程登录 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="host" username="user" password="pwd" port="port"
                               virtual-host="/" channel-cache-size="5"/>

    <!-- 配置admin,自动根据配置文件生成交换器和队列,无需手动配置 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 队列声明 -->
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test1"/>
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test2"/>

    <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="rabbit.queue.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="rabbit.queue.test1" key="rabbit.queue.test1.key"/>
            <rabbit:binding queue="rabbit.queue.test2" key="rabbit.queue.test2.key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- spring amqp默认的是jackson的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->
    <bean id="jsonMessageConverter" class="top.tarencez.ssmdemo.config.Gson2JsonMessageConverter"/>

    <!-- spring template声明 -->
    <rabbit:template id="amqpTemplate" exchange="rabbit.queue.exchange" routing-key="spring.queue.tag.key"
                     connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

    <bean id="receiveMessageListener" class="top.tarencez.ssmdemo.common.component.MQListenter" />

    <!-- queue litener  观察监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
        <rabbit:listener queues="rabbit.queue.test1" ref="receiveMessageListener" />
        <!--<rabbit:listener queues="rabbit.queue.test2" ref="receiveMessageListener" />-->
    </rabbit:listener-container>
</beans>

3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml

<import resource="classpath:conf/spring-rabbitmq.xml"/>

4.Gson配置

package top.tarencez.ssmdemo.config;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.google.gson.Gson;

public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
    private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
    private static ClassMapper classMapper = new DefaultClassMapper();
    private static Gson gson = new Gson();

    public Gson2JsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = gson.toJson(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        } catch (IOException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(
                            message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(),
                            encoding, targetClass);
                } catch (IOException e) {
                    throw new MessageConversionException(
                            "Failed to convert Message content", e);
                }
            } else {
                log.warn("Could not convert incoming message with content-type ["
                        + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws
            UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return gson.fromJson(contentAsString, clazz);
    }
}

5.生产者接口及接口调用

package top.tarencez.ssmdemo.common.component;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MQProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(String queueKey, Object message) {
        System.out.println("===== " + amqpTemplate);
        try {
            amqpTemplate.convertAndSend(queueKey, message);
            System.out.println("===== 消息发送成功 =====");
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
package top.tarencez.ssmdemo.rabbitmq.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import top.tarencez.ssmdemo.common.component.MQProducer;
import top.tarencez.ssmdemo.shiro.vo.TestVO;

@Controller
@RequestMapping("/mq")
public class MQController {

    @Autowired
    private MQProducer mqProducer;

    @RequestMapping("/test")
    public void test() {
        System.out.println("===== test mq send =====");
        TestVO testVO = new TestVO();
        testVO.setId(1);
        testVO.setName1("aaa");
        testVO.setName2("bbb");
        mqProducer.sendMessage("rabbit.queue.test1.key", testVO);
    }
}

6.消费者接口

package top.tarencez.ssmdemo.common.component;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class MQListenter implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        try {
            System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

7.测试验证

1.添加 maven 项目依赖

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>1.3.5.RELEASE</version>
</dependency>

2.添加 spring-rabbitmq.xml 配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- 连接服务配置,如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码,guest默认不允许远程登录 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="host" username="user" password="pwd" port="port"
                               virtual-host="/" channel-cache-size="5"/>

    <!-- 配置admin,自动根据配置文件生成交换器和队列,无需手动配置 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 队列声明 -->
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test1"/>
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test2"/>

    <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="rabbit.queue.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="rabbit.queue.test1" key="rabbit.queue.test1.key"/>
            <rabbit:binding queue="rabbit.queue.test2" key="rabbit.queue.test2.key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- spring amqp默认的是jackson的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->
    <bean id="jsonMessageConverter" class="top.tarencez.ssmdemo.config.Gson2JsonMessageConverter"/>

    <!-- spring template声明 -->
    <rabbit:template id="amqpTemplate" exchange="rabbit.queue.exchange" routing-key="spring.queue.tag.key"
                     connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>

    <bean id="receiveMessageListener" class="top.tarencez.ssmdemo.common.component.MQListenter" />

    <!-- queue litener  观察监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
        <rabbit:listener queues="rabbit.queue.test1" ref="receiveMessageListener" />
        <!--<rabbit:listener queues="rabbit.queue.test2" ref="receiveMessageListener" />-->
    </rabbit:listener-container>
</beans>

3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml

<import resource="classpath:conf/spring-rabbitmq.xml"/>

4.Gson配置

package top.tarencez.ssmdemo.config;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.google.gson.Gson;

public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
    private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
    private static ClassMapper classMapper = new DefaultClassMapper();
    private static Gson gson = new Gson();

    public Gson2JsonMessageConverter() {
        super();
    }

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        byte[] bytes = null;
        try {
            String jsonString = gson.toJson(object);
            bytes = jsonString.getBytes(getDefaultCharset());
        } catch (IOException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.contains("json")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = getDefaultCharset();
                }
                try {
                    Class<?> targetClass = getClassMapper().toClass(
                            message.getMessageProperties());
                    content = convertBytesToObject(message.getBody(),
                            encoding, targetClass);
                } catch (IOException e) {
                    throw new MessageConversionException(
                            "Failed to convert Message content", e);
                }
            } else {
                log.warn("Could not convert incoming message with content-type ["
                        + contentType + "]");
            }
        }
        if (content == null) {
            content = message.getBody();
        }
        return content;
    }

    private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws
            UnsupportedEncodingException {
        String contentAsString = new String(body, encoding);
        return gson.fromJson(contentAsString, clazz);
    }
}

5.生产者接口及接口调用

package top.tarencez.ssmdemo.common.component;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MQProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(String queueKey, Object message) {
        System.out.println("===== " + amqpTemplate);
        try {
            amqpTemplate.convertAndSend(queueKey, message);
            System.out.println("===== 消息发送成功 =====");
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
package top.tarencez.ssmdemo.rabbitmq.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import top.tarencez.ssmdemo.common.component.MQProducer;
import top.tarencez.ssmdemo.shiro.vo.TestVO;

@Controller
@RequestMapping("/mq")
public class MQController {

    @Autowired
    private MQProducer mqProducer;

    @RequestMapping("/test")
    public void test() {
        System.out.println("===== test mq send =====");
        TestVO testVO = new TestVO();
        testVO.setId(1);
        testVO.setName1("aaa");
        testVO.setName2("bbb");
        mqProducer.sendMessage("rabbit.queue.test1.key", testVO);
    }
}

6.消费者接口

package top.tarencez.ssmdemo.common.component;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class MQListenter implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        try {
            System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

7.测试验证

 

参考文章:

  https://www.cnblogs.com/tohxyblog/p/7256554.html

  https://blog.csdn.net/qq_37936542/article/details/80111555

  https://www.cnblogs.com/s648667069/p/6401463.html

原文地址:https://www.cnblogs.com/tarencez/p/10886938.html