spring cloud Stream消息驱动

spring cloud Stream

Cloud Stream 是什么?

 屏蔽底层消息中间件的差异,降低切换成本,统一消息模型。

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Stream中的通信方式遵循了发布-订阅模式。Topic主题进行广播,在RabbitMQ就是Exchange,在Kafka中就是Topic。

官网:https://spring.io/projects/spring-cloud-stream

目前只支持RabbitMQApache Kafka

版本对应:

       Spring Cloud Stream应用程序由与中间件无关的核心组成。该应用程序通过在外部代理暴露的目标与代码中的输入/输出参数之间建立绑定来与外界通信。建立绑定所需的特定于代理的详细信息由特定于中间件的Binder实现处理

API与常用注解:

Middleware: 中间件目前只支持RabbitMQ和Kafka

Binder:是应用与消息中间件的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应kafka的topic和rabbitMq的exchange),这些都可通过配置文件来实现。

@Input: 注解标识输入通道,通过输入通道接受到的消息进入应用程序。

@Output: 注解标识输出通道,发布的消息通过该通道离开应用程序。

@StreamListener: 监听队列,用于消费者的消息接受。

@EnableBinding: 指通过channel和exchange绑定在一起。

使用:

1:消息的生产者:

maven依赖:

<!-- spring-cloud-starter-stream-rabbit -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Yml配置:

server:
  port: 8801


spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding的整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境设置
             spring:
                rabbitmq:
                   host: localhost
                   port: 5672
                   username: guest
                   password: guest
      bindings: # 服务的整合
        output: # 这个名字是一个通道的名称
          destination: studyExchange #表示要使用的Exchange名称定义
          content-type: application/json # 设置消息的类型,本次为json,文本设置:"text/plain"
          binder: defaultRabbit # 设置要绑定消息服务的具体设置

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    # 设置心跳的时间间隔(默认是30秒)
    lease-renewal-interval-in-seconds: 2
    #  Eureka服务端在收到最后一次心跳后的等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 5
    # 在信息列表显示的主机名称
    instance-id: send-8801.com
    # 访问的路径变成IP地址
    prefer-ip-address: true

定义消息的发送接口:

public interface IMessageProvider {
    public String sendMessage();
}

定义消息的实现:

/ 定义消息的推送管道
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;

    @Override
    public String sendMessage() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("******serial: " + serial);
        return null;
    }
}

定义controller:

@RestController
public class SendMessageController {

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("sendMessage")
    public String sendMessage(){
        return messageProvider.sendMessage();
    }
}

启动rabbitmq:

2: 消息驱动之消费者:

Pom依赖与上面的消息生产者一样

Yml: 唯一不一样的是生产者是bindings.output,消费者是:bindings.input

server:
  port: 8802


spring:
  application:
    name: cloud-stream-rabbitmq-consumer
  cloud:
      stream:
        binders: # 此处配置要绑定的rabbitmq的服务信息
          defaultRabbit: # 表示定义的名称,用于binding的整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关环境设置
               spring:
                  rabbitmq:
                     host: localhost
                     port: 5672
                     username: guest
                     password: guest
        bindings: # 服务的整合
          input: # 这个名字是一个通道的名称
            destination: studyExchange #表示要使用的Exchange名称定义
            content-type: application/json # 设置消息的类型,本次为json,文本设置:"text/plain"
            binder: defaultRabbit # 设置要绑定消息服务的具体设置

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    # 设置心跳的时间间隔(默认是30秒)
    lease-renewal-interval-in-seconds: 2
    #  Eureka服务端在收到最后一次心跳后的等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 5
    # 在信息列表显示的主机名称
    instance-id: receive-8802.com
    # 访问的路径变成IP地址
    prefer-ip-address: true

定义消息接受类,接受消息:

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("我是消费者一号,---》接受到的消息:" + message.getPayload()+ "	  port:" + serverPort);
    }
    
}

消息被重复消费的问题(默认分组是不同的):

注意在Stream中处于同一个Group中的多个消费者是竞争关系,就能保证消息只会被其中的一个应用消费一次。

不同组的消息是可以被全面消费的(重复消费)

 同一个组内会发生竞争关系,只有其中一个可以消费。

解决:

自定义配置分成一个组:

server:
  port: 8803


spring:
  application:
    name: cloud-stream-rabbitmq-consumer
  cloud:
      stream:
        binders: # 此处配置要绑定的rabbitmq的服务信息
          defaultRabbit: # 表示定义的名称,用于binding的整合
            type: rabbit # 消息组件类型
            environment: # 设置rabbitmq的相关环境设置
               spring:
                  rabbitmq:
                     host: localhost
                     port: 5672
                     username: guest
                     password: guest
        bindings: # 服务的整合
          input: # 这个名字是一个通道的名称
            destination: studyExchange #表示要使用的Exchange名称定义
            content-type: application/json # 设置消息的类型,本次为json,文本设置:"text/plain"
            binder: defaultRabbit # 设置要绑定消息服务的具体设置
            # 配置消息分组,如果配置的消息接受者(消费者)都配置在同一个组下,多个消费者不会重复消费,否者分组名字不一样,会导致重复消费,不配置的情况下,默认在不同的分组
            group: myGroup

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    # 设置心跳的时间间隔(默认是30秒)
    lease-renewal-interval-in-seconds: 2
    #  Eureka服务端在收到最后一次心跳后的等待时间上限,单位为秒(默认是90秒),超时将剔除服务
    lease-expiration-duration-in-seconds: 5
    # 在信息列表显示的主机名称
    instance-id: receive-8803.com
    # 访问的路径变成IP地址
    prefer-ip-address: true

必须使用group属性,否则的话,当消费者挂掉后,再重启消费者消息可能被丢弃,导致之前生产者发送的消息,无法被消费

原文地址:https://www.cnblogs.com/dw3306/p/12730480.html