rabbitmq记录(2)spring整合rabbitmq--手动模式,测试消息的持久化

录:

1、spring整合rabbitmq--手动模式
2、测试消息的持久化

1、spring整合rabbitmq--手动模式    <--返回目录

  依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.3.5</version>
</dependency>
<!--<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.5.0</version>
</dependency>-->

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.3.2</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>4.3.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>4.3.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>compile</scope>
</dependency>

  applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit-2.3.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <context:component-scan base-package="com.oy"></context:component-scan>

    <!--定义 rabbitmq 连接工厂-->
    <rabbit:connection-factory id="connectionFactory" host="192.168.213.200" port="5672" username="oy"
                               password="123456" virtual-host="/test" confirm-type="CORRELATED"/>

    <!--定义 rabbitmq 的模板
        queue: 如果发送到队列写队列   exchange: 如果发送到交换机写交换机
    -->
    <rabbit:template id="template" connection-factory="connectionFactory"
        confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/>
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定义队列-->
    <rabbit:queue name="CONFIRM_TEST_QUEUE" />

    <!--定义交换机, 将队列绑定到交换机-->
    <rabbit:direct-exchange name="DIRECT_EXCHANGE" id="DIRECT_EXCHANGE">
        <rabbit:bindings>
            <rabbit:binding queue="CONFIRM_TEST_QUEUE" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义监听容器:当收到消息时会执行内部的配置-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        <rabbit:listener ref="receiveConfirmTestListener" queue-names="CONFIRM_TEST_QUEUE" />
    </rabbit:listener-container>

</beans>

  PublishUtil

package com.oy;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PublishUtil {
    @Autowired
    private AmqpTemplate template;

    public void send(String exchange, String routingKey, Object message) {
        template.convertAndSend(exchange, routingKey, message);
    }
}

  TestMain

package com.oy;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Date;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class TestMain {
    @Autowired
    private PublishUtil publishUtil;

    private static String exchange = "DIRECT_EXCHANGE"; // 交换机
    private static String queue = "CONFIRM_TEST_QUEUE"; // 队列

    @Test
    public void test1() throws Exception {
        publishUtil.send(exchange + "", queue + "", "now: " + new Date().getTime());
        Thread.sleep(2000);
    }
}

  ReturnCallBackListener

package com.oy;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ReturnCallBackListener implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("ReturnCallBackListener returnedMessage: " + returnedMessage.toString());
    }
}

  ConfirmCallBackListener

package com.oy;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("ConfirmCallBackListener ack: " + ack + ", cause: " + cause);
    }
}

  ReceiveConfirmTestListener

package com.oy;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            System.out.println("消费者接收到了消息:" + message);
            // 手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

2、测试消息的持久化    <--返回目录

  持久化三步:创建持久化的交换机;创建持久化的消息;创建持久化的队列;

   依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.5.0</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>

  log4j.properties

log4j.rootLogger=DEBUG,A1
log4j.logger.com.oy=DEBUG

log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] - [%p] %m%n

  Sender

package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

import java.util.Date;

public class Sender {
    private final static String TEST_EXCHANGE_TOPIC = "test-exchange-topic";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明持久化交换机 类型是topic,也就是路由模式的交换机
        channel.exchangeDeclare(TEST_EXCHANGE_TOPIC,"topic", true);

        for (int i = 0; i < 100; i++) {
            System.out.println("i=" + i);
            // 声明持久化消息
            channel.basicPublish(TEST_EXCHANGE_TOPIC, "key.1.1", MessageProperties.PERSISTENT_TEXT_PLAIN, ("路由key.1.1消息. i= " + i).getBytes());
            Thread.sleep(200);
        }

        channel.close();
        connection.close();
    }
}

  Receiver1

package topic;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Receiver1 {
    private final static String TEST_EXCHANGE_TOPIC = "test-exchange-topic";
    private final static String TOPIC_QUEUE_1 = "topic_queue_1";
    //private final static String KEY_1 = "key.*";
    private final static String KEY_1 = "key.#";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectUtil.getConnection();
        final Channel channel = connection.createChannel();
        // 声明持久化队列
        channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(TOPIC_QUEUE_1, TEST_EXCHANGE_TOPIC, KEY_1);

        // 告诉服务器,在我没有确认当前消息完成之前,不要给我发新的消息
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
                // 确认收到消息 参数2:false为确认收到消息, true为拒绝收到消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(TOPIC_QUEUE_1, false, consumer); // 参数2 false 不自动确认收到消息,需要手动确认
    }
}

  ConnectUtil

package topic;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectUtil {
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.213.200");
        factory.setPort(5672);
        factory.setUsername("oy");
        factory.setPassword("123456");
        factory.setVirtualHost("/test");
        return factory.newConnection();
    }
}

   重启后队列里面仍然是100条消息

---

原文地址:https://www.cnblogs.com/xy-ouyang/p/14466643.html