spring-cloud-stream-rabbit的一个topic对应多组消费者实例

一、概述

   “Spring Cloud Stream is a framework for building message-driven microservice applications.”这是来自官方文档对spring cloud sream的介绍,大致可以理解为Spring Cloud Stream 是一个构建消息驱动微服务的框架,该项目用于代理消息队列的集成过程。避免业务与具体的mq产品有深刻的绑定关系,易于后期的服务切换。

二、课题

  如何通过spring-cloud-starter-stream-rabbit实现一条消息,可以被多个群组同时消费?

三、过程

1、创建消息生产者项目

  •  pom引用
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
  • spring-cloud-stream的生产队列配置
spring:
  rabbitmq:
    addresses: 192.168.0.114:5672
    username: simm
    password: test
  cloud:
    stream:
      default-binder: rabbit
      bindings:
        # 生产-消费者 指向同一个topic
        bizunit-install-producter:
          destination: yysc-bizunit-install
#          group: bizunit-queue
          consumer:
            concurrency: 5
  • yysc-bizunit-install消息的生产,即rabbit的topic-exchange
 1 /**
 2  * 生产者通道
 3  * @author miscr
 4  */
 5 public interface InstallCallbackOutputChannel {
 6     /**
 7      * 定义通道的名称
 8      */
 9     String OUTPUT = "bizunit-install-producter";
10 
11     /**
12      * 定义为输入通道
13      * @return
14      */
15     @Output(OUTPUT)
16     MessageChannel output();
17 }
18 
19 
20 /**
21  * 生产消息
22  * @author miscr
23  */
24 @EnableBinding(InstallCallbackOutputChannel.class)
25 public class InstallCallbackSender {
26     @Bean
27     @InboundChannelAdapter(value = InstallCallbackOutputChannel.OUTPUT,poller = @Poller(fixedDelay = "2000"))
28     public MessageSource<Date> timerMessagaSource(){
29         return ()->new GenericMessage<>(new Date());
30     }
31 }

2、创建两个消费者群组,群组分别命名为 main 和 template,各设置5个消费线程

  •  application.yml中配置消费者相关信息
spring:
  rabbitmq:
    addresses: 192.168.0.114:5672
    username: simm
    password: test
  cloud:
    stream:
      default-binder: rabbit
      bindings:
        install-consumer:
          destination: yysc-bizunit-install
          group: template
          consumer:
            concurrency: 5
        bizunit-install-consumer:
          destination: yysc-bizunit-install
          group: main
          consumer:
            concurrency: 5
  • 消费者 install-consumer 的绑定源码示例
/**
 * 安装状态回调接收通道
 * @author miscr
 */
public interface InstallCallbackInputChannel {
    /**
     * 定义通道的名称
     */
    String INPUT = "install-consumer";

    /**
     * 定义为输入通道
     * @return
     */
    @Input(INPUT)
    SubscribableChannel input();
}

/**
 * 消费者服务
 *
 * @author miscr
 */
@EnableBinding(InstallCallbackInputChannel.class)
public class InstallCallbackReceiver {
    /**
     * 消息监听
     *
     * @param message
     */
    @StreamListener(InstallCallbackInputChannel.INPUT)
    private void receiver(Object message) {
        System.out.println("template" + message.toString());
    }
}
  • 消费者 bizunit-install-consumer 的绑定源码示例
/**
 * 安装状态回调接收通道
 * @author miscr
 */
public interface BizUnitInstallCallbackInputChannel {
    /**
     * 定义通道的名称
     */
    String INPUT = "bizunit-install-consumer";

    /**
     * 定义为输入通道
     * @return
     */
    @Input(INPUT)
    SubscribableChannel input();
}

/**
 * 消费者服务
 *
 * @author miscr
 */
@EnableBinding(BizUnitInstallCallbackInputChannel.class)
public class BizUnitInstallCallbackReceiver {
    /**
     * 消息监听
     *
     * @param message
     */
    @StreamListener(BizUnitInstallCallbackInputChannel.INPUT)
    private void receiver(Object message) {
        System.out.println("bizunit-install" + message.toString());
    }
}

四、启动后监看mq的队列生产与消费情况

  • 生产者项目与消费者项目启动后,查看mq的控制台,结果如下

 

  • 查看java控制台的消费日志,两个群组确实都在消费同一条消息

原文地址:https://www.cnblogs.com/MrSi/p/14214141.html