springboot使用配置文件的方式集成RabbitMQ

RabbitMQ可以使用编程式和申明式的方式,网上很多都是直接使用编程的方式来使用,这里主要讲如何使用springboot配置文件的方式来使用RabbitMQ。

demo可以参考(https://github.com/Little-Orange7/rabbitMQ 和 https://github.com/Little-Orange7/rabbitMQ-async)

一.搭建环境:

由于RabbitMQ是使用Erlang语言编写的,所以要先安装Erlang;

1.下载Erlang:http://www.erlang.org/downloads,注意 rabbitMq 和erlang的版本要对应,可以在http://www.rabbitmq.com/which-erlang.html页面查看对应版本。

2.下载RabbitMQ:RabbitMq Server http://www.rabbitmq.com/releases/rabbitmq-server/

下载完之后,安装;

二.RabbitMQ配置

rabbitMQ配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <description>rabbitmq 服务配置</description>
    <!-- 公共部分 -->
    <!-- 创建连接类 连接安装好的 rabbitmq -->
    <bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="localhost" />
        <property name="virtualHost" value="${rabbitmq.virtualHost}" />
        <!-- host,RabbitMQ服务器地址,默认值"localhost" -->
        <property name="host" value="${rabbitmq.ip}" />
        <!-- port,RabbitMQ服务端口,默认值为5672 -->
        <property name="port" value="${rabbitmq.port}" />
        <!-- username,访问RabbitMQ服务器的账户,默认是guest -->
        <property name="username" value="${rabbitmq.username}" />
        <!-- username,访问RabbitMQ服务器的密码,默认是guest -->
        <property name="password" value="${rabbitmq.password}" />
        <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
        <property name="channelCacheSize" value="${rabbitmq.channelCacheSize}" />
        <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
        <property name="cacheMode" value="CHANNEL" />
    </bean>
    <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例-->
    <!--<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.ip}" port="${rabbitmq.port}"
    username="${rabbitmq.manager.user}" password="${rabbitmq.manager.password}" />-->

    <!-- RabbitAdmin主要用于在Java代码中对用于创建、绑定、管理队列与交换机,可以直接通过RabbitAdmin接口来管理 -->
<!--    <rabbit:admin connection-factory="connectionFactory"/>-->

    <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
    <!--管理平台中显示是name,消费者监听的也是name,此处定义的id在绑定生产者时使用-->
    <rabbit:queue id="queue_one" name="${test_queue_one}" durable="true" auto-delete="false" exclusive="false" />
    <rabbit:queue id="queue_two" name="${test_queue_two}" durable="true" auto-delete="false" exclusive="false" />

    <!--定义交换机,绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
    <!--管理平台中显示的是name,绑定amqpTemplate也是使用name-->
    <rabbit:direct-exchange id="direct_Exchange" name="directExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <!--queue对应上面定义的queue的id,此处的key是routingKey,调用生产者发送消息到对应队列需要传入相应的routringKey-->
            <rabbit:binding queue="queue_one" key="${routingKey.one}"></rabbit:binding>
            <rabbit:binding queue="queue_two" key="${routingKey.two}"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义 amqpTemplate 将交换机和amqp绑定,其中exchange对应的是上面定义的交换机的name-->
    <rabbit:template id="amqpTemplate" exchange="directExchange" connection-factory="connectionFactory" channel-transacted="true"/>


    <!-- 生产者部分 -->
    <!-- 发送消息的producer类,也就是生产者 -->
    <bean id="testProducer" class="com.example.rabbitdemo.producer.impl.TestProducerImpl">
        <!-- spring注入 -->
        <property name="amqpTemplate" ref="amqpTemplate"/>
    </bean>


    <!-- 消费者部分 -->
    <!-- 自定义消费者类 -->
    <bean id="testConsumerOne" class="com.example.rabbitdemo.consumer.TestConsumerOne"></bean>
    <bean id="testConsumerTwo" class="com.example.rabbitdemo.consumer.TestConsumerTwo"></bean>
    <!-- 配置监听,默认是acknowledge="auto",可以设置手动应答acknowledge="manual"-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <!--此处queue-names对应的是定义queue的name而不是id-->
        <rabbit:listener queue-names="${test_queue_one}" ref="testConsumerOne" />
        <rabbit:listener queue-names="${test_queue_two}" ref="testConsumerTwo" />
    </rabbit:listener-container>


</beans>

上述的这个配置文件包含生产者和消费者的配置,这里是同一个项目用作异步调用使用,所以将生产者和消费者配置文件合并到一起,实际生产者和消费者分开的情况是最常见的,所以,可以根据项目需要,将这个配置文件拆分成生产者和消费者两个配置文件(可以参考这个项目:https://github.com/Little-Orange7/rabbitMQ);

使用:

生产者:

@Service
public class TestProducerImpl implements TestProducer {

    //由配置文件配置方式注入
    private AmqpTemplate amqpTemplate;

    public AmqpTemplate getAmqpTemplate() {
        return amqpTemplate;
    }

    public void setAmqpTemplate(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }

    private void send(String routingKey, String json){
        System.out.println("--------------Producer-----start-------");
        System.out.println("routingKey:"+routingKey+",json:"+json);
        amqpTemplate.convertAndSend(routingKey,json);
        System.out.println("--------------Producer------end------");
    }

    @Override
    public void sendMessage(String routingKey, Object obj) {
        String json= JSON.toJSONString(obj);
        send(routingKey,json);
    }
}

生产者通过AmqpTemplate接口来发送消息,其中需要传入routingKey,这个routingKey是定义交换机的时候绑定的queue的key值,生产者通过这个routingKey来确定要将发送的消息放入到哪个queue中;

消费者:

public class TestConsumerOne implements MessageListener {

    @Override
    public void onMessage(Message message) {
        String messageJson=new String(message.getBody());
        System.out.println("ConsumerOne  messageJson:"+messageJson);
        TestPojo tj =JSON.parseObject(messageJson, new TypeReference<TestPojo>() {});
    }
}

消费者实现MessageListener这个接口,可以监听到queue中的消息,如果有待处理的message,则执行onMessage方法,可以从Message中获取到数据;这个消费者监听的queue在配置文件中就已经声明过了,绑定要监听的queue的name,只要这个监听的queue中存在待处理的消息,则会执行onMessage方法;

三.Springboot集成RabbitMQ配置文件

要读取此配置文件,必须加此注解,可以在启动类上加,也可以在放配置类上;

@SpringBootApplication
@ImportResource(locations = {"classpath:spring-rabbitmq.xml"})
public class RabbitDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitDemoApplication.class, args);
    }

}
原文地址:https://www.cnblogs.com/littleorange7/p/13536125.html