SpringCloud Stream消息驱动

一、概述

1、简介

​ Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动型微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立并熟悉的Spring习惯用法和最佳实践的基础上,包括对持久性pub / sub语义,使用者组和有状态分区的支持。可以屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。官方文档地址:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.3.RELEASE/reference/html/

2、设计思想

​ 通过定义绑定器作为中间件,实现应用程序与消息中间件细节之间的隔离,遵循发布/订阅模式。架构如下:

说明:

组成 说明
Middleware 中间件,目前只支持RibbitMQ和Kafaka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

二、实现步骤

1、生产者配置

  • 新建一个模块,在pom中添加以下依赖;

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--热部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
  • 配置application.yml文件

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: #配置需要绑定的rabbitmq 的服务信息
            defaultRabbit: #表示定义的名称,用于与binding整合
              type: rabbit #消息组件类型
              environment: #设置rabbitmq的相关环境
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: #服务的整合处理
            output: #通道名称
              destination: studyExchange #要使用的Exchange名称定义
              content-type: application/json #设置消息类型
              binder: defaultRabbit #设置要绑定的消息服务的具体设置
    
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 #设置心跳的间隔时间(默认30秒)
        lease-expiration-duration-in-seconds: 5 #间隔时间
        instance-id: sned-8801.com #信息列表显示主机名称
        prefer-ip-address: true #显示ip地址
    
  • 定义一个消息推送类;

    @EnableBinding(Source.class)//定义消息的推送管道
    public class MessageProviderImpl implements IMessageProvider {
    
        @Resource
        private MessageChannel output;//消息发送管道
        public String send() {
            String serial = UUID.randomUUID().toString();
            output.send(MessageBuilder.withPayload(serial).build());
            System.out.println("************"+serial);
            return serial;
        }
    }
    
  • 控制类调用上面的消息推送类;

    @Resource
    private IMessageProvider messageProvider;
    
    @GetMapping("/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
    

2、消费者配置

  • 新建一个模块,在pom中添加依赖(同生产者一样);

  • 配置application.yml文件,只需修改 output 为 input;

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: #配置需要绑定的rabbitmq 的服务信息
            defaultRabbit: #表示定义的名称,用于与binding整合
              type: rabbit #消息组件类型
              environment: #设置rabbitmq的相关环境
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: #服务的整合处理
            input: #通道名称
              destination: studyExchange #要使用的Exchange名称定义
              content-type: application/json #设置消息类型
              binder: defaultRabbit #设置要绑定的消息服务的具体设置
    
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 #设置心跳的间隔时间(默认30秒)
        lease-expiration-duration-in-seconds: 5 #间隔时间
        instance-id: receive-8802.com #信息列表显示主机名称
        prefer-ip-address: true #显示ip地址
    
  • 消费者业务类编写,需要开启绑定,定义消息接收管道 @EnableBinding(Sink.class),使用@StreamListener(Sink.INPUT) 监听队列,用于消费者的队列的消息接收。

    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageListenerController {
    
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message){
            System.out.println("消费者1号,--------------接收到的消息:"+message.getPayload()+"	 port"+serverPort);
        }
    }
    

3、分组消费与持久化

​ 在集群服务中,一个消息可能会被消费多次,将其放入同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。只需要自定义分组就可以完成分组与持久化,在客户端加上配置 group。

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      bindings: #服务的整合处理
        input: #通道名称
          destination: studyExchange #要使用的Exchange名称定义
          content-type: application/json #设置消息类型
          binder: defaultRabbit #设置要绑定的消息服务的具体设置
          group: groupA #自定义分组

案例代码地址:https://github.com/xhanglog/springcloud-learning

原文地址:https://www.cnblogs.com/Mhang/p/12587539.html