Spring Cloud:Stream基础知识

背景

  消息中间件有多种,rabbitmq,rocketmq,activemq,kafka等。
  不同的消息中间件具体细节不一样。那么有没有一种新的技术诞生,让我们不再关注具体MQ细节,我们只需要用一种适配绑定的方式,自动给我们在各种MQ内切换。
  一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。Spring Cloud Stream 因此诞生。
  官方定义 Spring Cloud Stream是一个构建消息驱动微服务的框架。
  应用程序使用inputs或者outputs来与springcloud stream中binder交互。
  通过我们配置来bingding(绑定),而stream的binder对象负责与消息中间件交互。
  所以,我们只需要搞清楚如何与springcloud stream交互就可以方便使用消息驱动方式。
  SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用发布-订阅、消费组、分区三个核心概念。
  目前仅支持RabbitMQ和Kafka。
Stream处理架构:

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件之间的隔离。
stream标准流程套路:

Binder:很方便的连接中间件,屏蔽差异。
channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。
source和sink:简单理解为消息的输入输出。
编码API和注解:

Stream消息生产者

pom依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

yml配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.10.132
                port: 5672
                username: guest
                password: guest

eureka:
  client:
    #是否将自己注册到Eureka Server 默认为true
    register-with-eureka: true
    #是否从EurekaServer抓取已有的注册信息,默认为true,单节点无所谓,集群必须设置true才能配合ribbon做负载均衡
    fetch-registry: true
    service-url:
      #设置eureka server交互的地址查询服务和注册服务都需要依赖这个地址
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: send-8801.com
    prefer-ip-address: true

发送消息的接口:

public interface IMessageProvider {
    public String send();
}

发送消息的实现:

@EnableBinding(Source.class)//定义消息的推送管道
public class MessageProvider implements IMessageProvider {

    @Autowired
    private MessageChannel output;

    @Override
    public String send() {
        String serial = IdUtil.simpleUUID();
        System.out.println(serial+"============");
        output.send(MessageBuilder.withPayload(serial).build());
        return serial;
    }
}

controller

@RestController
public class SendController {

    @Autowired
    private IMessageProvider messageProvider;

    @GetMapping("sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

调用接口,观察效果

Stream消息消费者

配置yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.10.132
                port: 5672
                username: guest
                password: guest

eureka:
  client:
    #是否将自己注册到Eureka Server 默认为true
    register-with-eureka: true
    #是否从EurekaServer抓取已有的注册信息,默认为true,单节点无所谓,集群必须设置true才能配合ribbon做负载均衡
    fetch-registry: true
    service-url:
      #设置eureka server交互的地址查询服务和注册服务都需要依赖这个地址
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: receive-8802.com
    prefer-ip-address: true

接收消息:

@EnableBinding(Sink.class)
@RestController
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(value = Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消息:"+message.getPayload()+"serverPort:"+serverPort);
    }

}

stream重复消费

下默认配置中,多个消费者存在,会存在重复消费问题
原因:默认分组group是不同的,组流水号不一样,被认为是不同组,可以消费,所以要自定义配置分组。

yml配置:

  cloud:
    stream:
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
          group: wen.jie

通过配置后,两个消费者被分配到一组,就不存在重复消费的问题。

原文地址:https://www.cnblogs.com/wwjj4811/p/13628099.html