SpringMVC集成RebbitMq(含源码)

安装RebbitMq可以参考:https://blog.csdn.net/qq_47588845/article/details/107986373

1.添加jar

 2.添加生产者配置文件:application-rabbitProduce.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 定义连接工厂,用于创建连接等 -->
    <rabbit:connection-factory id="mqProduceConnectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672"/>

    <!-- 定义admin,producer中的exchange,queue会自动的利用该admin自动在spring中生成 -->
    <rabbit:admin connection-factory="mqProduceConnectionFactory"/>

    <!-- 定义rabbitmq模板,用于接收以及发送消息 -->
    <rabbit:template id="amqpTemplate" connection-factory="mqProduceConnectionFactory" exchange="hjexchange" routing-key="wzh"/>

    <!-- 利用admin来定义队列,spring会自动根据下面的定义创建队列
        队列名    是否持久化    是否是排他队列    不使用的时候是否删除    -->
    <rabbit:queue name="com.lll.employee" auto-delete="false" durable="true" exclusive="false" auto-declare="true"/>

    <!-- 定义Exchange,并且将队列与Exchange绑定,设定Routing Key -->
    <!-- name为Echange的name -->
    <rabbit:direct-exchange name="hjexchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding key="com.lll.employee" queue="com.lll.employee"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

3.添加消费者配置文件:application-rabbitConsume.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context-4.0.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

    <!-- 定义连接工厂,用于创建连接等 -->
    <rabbit:connection-factory id="mqConsumerConnectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672"/>

    <!-- 定义admin,producer中的exchange,queue会自动的利用该admin自动在spring中生成 -->
    <rabbit:admin connection-factory="mqConsumerConnectionFactory"/>

    <!-- 利用admin来定义队列,spring会自动根据下面的定义创建队列
        队列名    是否持久化    是否是排他队列    不使用的时候是否删除    -->
    <rabbit:queue name="com.lll.employee" auto-delete="false" durable="true" exclusive="false" auto-declare="true"/>

    <!-- 定义Exchange,并且将队列与Exchange绑定,设定Routing Key -->
    <!-- name为Echange的name -->
    <rabbit:direct-exchange name="hjexchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding key="com.lll.employee" queue="com.lll.employee"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 定义消费者,消费消息 -->
    <bean id="directConsumer" class="com.lll.rebbitmq.DirectConsumer"></bean>

    <!--开启监听,也可以理解为:
        将消费者与队列进行绑定,这样就会在,当队列有消息的时候,会由绑定的消费者来进行消费,
        也可以算是指定消费者来监听指定的队列.当有消息到达的时候,通知消费者进行消费 -->
    <rabbit:listener-container connection-factory="mqConsumerConnectionFactory">
        <!-- 注入bean,指定队列 -->
        <rabbit:listener ref="directConsumer" queues="com.lll.employee"/>
    </rabbit:listener-container>

</beans>

3.在applicationContext.xml中引入对应配置

    <!--    rabbitMq的生产者和消费者的配置-->
    <import resource="application-rabbitProduce.xml"/>
    <import resource="application-rabbitConsume.xml"/>

**注意:这里如果配置了其他的 connection-factory 命名必须不一样,否则不能识别会报错。

4.添加生产者服务

package com.lll.rebbitmq;


import javax.annotation.Resource;
import com.lll.utils.JacksonUtil;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;


/**
 * 通过调用rabbitmq AmqpTemplate对象发送消息
 * @author lll
 *
 */
@Service("messageProducer")
public class MessageProducer {

    private Logger log = Logger.getLogger(MessageProducer.class);

    @Resource
    private AmqpTemplate amqpTemplate;

    public void send(Object message) {
        log.info("发送消息为 : " + JacksonUtil.toJsonString(message));
        amqpTemplate.convertAndSend("com.lll.employee", JacksonUtil.toJsonString(message));
    }
}

5.添加消费者服务

package com.lll.rebbitmq;


import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;


/**
 * rabbitmq接收推送的数据
 * @author lll
 *
 */
@Service("directConsumer")
public class DirectConsumer implements MessageListener {

    private Logger log = Logger.getLogger(DirectConsumer.class);

    @Override
    public void onMessage(Message message) {
        log.info("------> 消息队列 接收到的消息为 : " + message);
    }

}

6.在接口中调用

    /**
     * @time 2018年7月31日下午4:45:41
     * @author lll
     * @describe 通过消息对接异步添加
     */
    @PostMapping("/addByRebbitmq")
    public String addByRebbitmq(@RequestBody Employee employee) {
        try{
            String message = JacksonUtil.toJsonString(employee);
            messageProducer.send(message);
            return  "success";
        }catch (Exception e){
            return  e.getMessage();
        }
    }

7.实际效果:

 

 8.资源

  源码地址:https://github.com/CodingPandaLLL/springlll.git

  源码压缩包地址:https://codeload.github.com/CodingPandaLLL/springlll/zip/refs/tags/1.0.3

作者:CodingPanda
座中铭:润物细无声,功到自然成
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利.
原文地址:https://www.cnblogs.com/LiLiliang/p/14624587.html