Springboot2.x整合RabbitMQ

1、RabbitMQ介绍

可参照RabbitMQ笔记

2、接入配置

pom依赖

<!--amqp依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

server.port=8080

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=192.168.242.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、一对一模式

  即一个生产者对一个消费者模式

配置类

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue kinsonQueue() {
        return new Queue("kinson");
    }

}

消费者

@Component
//监听队列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver1 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver1 :" + msg);
    }
}

消息生产者测试接口

    /**
     * 单条消息发送给单个队列,该队列只有一个消费者
     *
     * @return
     */
    @GetMapping(value = "send")
    public String send() {
        String content = "Date:" + System.currentTimeMillis();
        //发送默认交换机对应的的队列kinson
        amqpTemplate.convertAndSend("kinson", content);
        return content;
    }

4、一对多模式

  即一个生产者对多个消费者,该模式下可以是一个生产者将消息投递到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理。另外也可以是一个生产者将消息投递到多个队列里,此时消息是被复制处理。

模式一:

配置类

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue kinsonQueue() {
        return new Queue("kinson");
    }

}

消费者1

@Component
//监听队列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver1 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver1 :" + msg);
    }
}

消费者2

@Component
//监听队列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver2 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver2 :" + msg);
    }
}

消息生产者测试接口

    /**
     * 发送多条消息到一个队列,该队列有多个消费者
     *
     * @return
     */
    @GetMapping(value = "sendMore")
    public String sendMore() {
        List<String> result = new ArrayList<String>();
        //发送10条数据
        for (int i = 0; i < 10; i++) {
            String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis();
            //发送默认交换机对应的的队列kinson,此时有两个消费者MyReceiver1和MyReceiver2,每条消息只会被消费一次
            amqpTemplate.convertAndSend("kinson", content);
            result.add(content);
        }
        return String.join("<br/>", result);
    }

模式二:

配置类

@Configuration
public class RabbitMqConfig {

    @Bean
    public Queue kinsonQueue() {
        return new Queue("kinson");
    }

    @Bean
    public Queue kinsonQueue2() {
        return new Queue("kinson2");
    }
}

kinson队列消费者

@Component
//监听队列kinson
@RabbitListener(queues = {"kinson"})
public class MyReceiver1 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver1 :" + msg);
    }
}

kinson2队列消费者

@Component
//监听队列kinson2
@RabbitListener(queues = {"kinson2"})
public class MyReceiver3 {

    @RabbitHandler
    public void receiver(String msg) {
        System.out.println("MyReceiver3 :" + msg);
    }
}

消息生产者测试接口

  /**
     * 发送多条消息到多个队列
     *
     * @return
     */
    @GetMapping(value = "sendMoreQueue")
    public String sendMoreQueue() {
        List<String> result = new ArrayList<String>();
        //发送10条数据
        for (int i = 0; i < 10; i++) {
            String content = "第" + (i + 1) + "次发送 Date:" + System.currentTimeMillis();
            //发送默认交换机对应的的队列kinson
            amqpTemplate.convertAndSend("kinson", content);
            //发送默认交换机对应的的队列kinson2
            amqpTemplate.convertAndSend("kinson2", content);
            result.add(content);
        }
        return String.join("<br/>", result);
    }

相应测试结果请自测

5、ACK消息确认

配置文件加入相应配置

# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

配置类,使用Fanout类型的Exchange,主要是设置队列,交换机及绑定

@Configuration
public class RabbitMqFanoutACKConfig {

    @Bean
    public Queue ackQueue() {
        return new Queue("ackQueue");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(ackQueue).to(fanoutExchange);
    }

}

消息发送服务

@Service
public class AckSenderService implements RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
} /** * 消息发送 */ public void send() { final String content = "现在时间是" + LocalDateTime.now(ZoneId.systemDefault()); //设置返回回调 rabbitTemplate.setReturnCallback(this); //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息发送成功!"); } else { System.out.println("消息发送失败," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }

消息消费者

@Component
@RabbitListener(queues = {"ackQueue"})
public class MyAckReceiver {

    @RabbitHandler
    public void process(String sendMsg, Channel channel, Message message) {

        System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                + LocalDateTime.now(ZoneId.systemDefault()));

        try {
            //告诉服务器收到这条消息已经被当前消费者消费了,可以在队列安全删除,这样后面就不会再重发了,
            //否则消息服务器以为这条消息没处理掉,后续还会再发
            //第二个参数是消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success");
        } catch (Exception e) {
            System.out.println("process fail");
            e.printStackTrace();
        }

    }
}

测试访问接口

   /**
     * @return
     */
    @GetMapping(value = "ackSend")
    public String ackSend() {
        senderService.send();

        return "ok";
    }

测试将Consumer确认代码注释掉,即

@Component
@RabbitListener(queues = {"ackQueue"})
public class MyAckReceiver {

    @RabbitHandler
    public void process(String sendMsg, Channel channel, Message message) {

        System.out.println("AckReceiver  : 收到发送消息 " + sendMsg + ",收到消息时间"
                + LocalDateTime.now(ZoneId.systemDefault()));

        try {
            //告诉服务器收到这条消息已经被当前消费者消费了,可以在队列安全删除,这样后面就不会再重发了,
            //否则消息服务器以为这条消息没处理掉,后续还会再发
            //第二个参数是消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success");
        } catch (Exception e) {
            System.out.println("process fail");
            e.printStackTrace();
        }

    }
}

此时访问测试接口,可以看到当消息发送完被消费掉之后,队列的状态变为unacked。

当停掉服务时,unacked状态变为Ready

再重新启动服务时会重新发送消息

6、事务机制

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
//声明启动事务模式
channel.txSelect();
//提交事务
channel.txComment();
//回滚事务
channel.txRollback();

消息发送示例

public void publish()
            throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost("/");
        factory.setHost(host);
        factory.setPort(port);
        Connection conn = factory.newConnection();
        // 创建信道
        Channel channel = conn.createChannel();
        // 声明队列
        channel.queueDeclare(TX_QUEUE, true, false, false, null);

        try {

            long startTime = System.currentTimeMillis();

            for (int i = 0; i < 10; i++) {
                // 声明事务
                channel.txSelect();
                String message = String.format("时间 => %s", System.currentTimeMillis());
                // 发送消息
                channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                // 提交事务
                channel.txCommit();
            }

            long endTime = System.currentTimeMillis();

            System.out.println("事务模式,发送10条数据,执行花费时间:" + (endTime - startTime) + "s");

        } catch (Exception e) {
            channel.txRollback();
        } finally {
            channel.close();
            conn.close();
        }
    }

消息消费示例

public void consume() throws IOException, TimeoutException, InterruptedException {

        Connection conn = RabbitMqConnFactoryUtil.getRabbitConn();
        Channel channel = conn.createChannel();
        channel.queueDeclare(TX_QUEUE, true, false, false, null);
        // 声明事务
        channel.txSelect();
        try {
            //单条消息获取进行消费
            GetResponse resp = channel.basicGet(TX_QUEUE, false);
            String message = new String(resp.getBody(), "UTF-8");
            System.out.println("收到消息:" + message);
            //消息拒绝
            // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true);
            // 消息确认
            channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
            // 提交事务
            channel.txCommit();
        } catch (Exception e) {
            // 回滚事务
            channel.txRollback();
        } finally {
            //关闭通道、连接
            channel.close();
            conn.close();
        }
    }

7、Confirm消息确认

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的,Confirm的三种实现方式:
//方式一:普通发送方确认模式
channel.waitForConfirms();
//方式二:批量确认模式
channel.waitForConfirmsOrDie();
//方式三:异步监听发送方确认模式
channel.addConfirmListener();

消息发布示例

public void publish() throws IOException, TimeoutException, InterruptedException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
        factory.setVirtualHost("/");
        factory.setHost(host);
        factory.setPort(port);
        Connection conn = factory.newConnection();
        // 创建信道
        Channel channel = conn.createChannel();
        // 声明队列
        channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 10; i++) {
            // 开启发送方确认模式
            channel.confirmSelect();
            String message = String.format("时间 => %s", System.currentTimeMillis());
            channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8"));
        }

        //添加确认监听器
        channel.addConfirmListener(new ConfirmListener() {

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("未确认消息,标识:" + deliveryTag);
            }

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
            }
        });

        long endTime = System.currentTimeMillis();

        System.out.println("执行花费时间:" + (endTime - startTime) + "s");

    }

RabbitMQ简单示例源码参照Github

原文地址:https://www.cnblogs.com/kingsonfu/p/10599608.html