Spring Cloud Stream 构建消息驱动微服务使用

Spring Cloud Stream是什么?
Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。它通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动的微服务应用。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点,实现了自动化配置的功能帮忙我们可以快速的上手使用,其主要包含以下核心概念和内容:

  • Spring Cloud Stream的应用模型
  • 绑定抽象
  • 持久化发布/订阅支持
  • 消费组
  • 消息分区
  • 可插拔绑定api 应用模型

spring-messaging模块为集成messaging api和消息协议提供支持。

其代码结构为:

 

其中base定义了消息Message(MessageHeader和body)、消息处理MessageHandler、发送消息MessageChannel。

其中:

message由两部分组成,

MessageHandler是一个处理消息的约定,spring messaging提供了丰富的消息处理方式。

MessageChannel表现为pipes-and-filters架构的管道。

 转换器converter模块

    对消息转换提供支持。其结构如下:

从上图可以看出,有消息到string、json、byte数组之间的相互转换。 

handler模块

HandlerMethod封装了一个bean的方法相关信息(getMethod()和getBean()方法),提供了访问方法参数的便利工具。HandlerMethod可以在bean factory中使用createWithResolvedBean获取bean实例时获取该实例。

MessageCondition是一个将conditions映射到message的约定。

HandlerMethodArgumentResolver 是一个解析方法参数到Context中指定Message的参数值的策略接口。

HandlerMethodReturnValueHandler是一个处理从触发一个Message的method Handling返回值的策略接口。

另外,也提供了部分注解:

@interface Header:Annotation which indicates that a method parameter should be bound to a message header.

@interface Headers:Annotation which indicates that a method parameter should be bound to the headers of a message. The annotated parameter must be   assignable to {@link java.util.Map} with String keys and Object values.

@interface MessageExceptionHandler: Annotation for handling exceptions thrown from message-handling methods within a  specific handler class.

@interface MessageMapping:Annotation for mapping a {@link Message} onto message-handling methods by matching to the message destination.

@interface Payload:Annotation that binds a method parameter to the payload of a message. The payload may be passed through a {@link   MessageConverter} to convert it from serialized form with specific MIME type to an Object matching the target method parameter.

@interface SendTo:Annotation that indicates a method's return value should be converted to a {@link Message} and sent to the specified destination.

5.Simp模块

包含诸如STOMP协议的简单消息协议的通用支持。

STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互,类似于OpenWire(一种二进制协议)。由于其设计简单,很容易开发客户端,因此在多种语言和多种平台上得到广泛应用。其中最流行的STOMP消息代理是Apache ActiveMQ。
详细协议内容中文版本参见。另stomp架构如下:

应用模型
应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。 

Binder

Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。

通过 binder ,可以很方便的连接中间件,可以动态的改变消息的
destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。

甚至可以任意的改变中间件的类型而不需要修改一行代码。

Publish-Subscribe

消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。

这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。

Consumer Groups

“Group”,如果使用过 Kafka 的童鞋并不会陌生。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。

微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。

Durability

消息事件的持久化是必不可少的。Spring Cloud Stream 可以动态的选择一个消息队列是持久化,还是 present。

Bindings

bindings 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binding 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。 

绑定抽象
Binder绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的Binder绑定器而不需要修改任何Spring Boot的应用逻辑。这一点在上一章实现消息总线时,从RabbitMQ切换到Kafka的过程中,已经能够让我们体验到这一好处。
目前版本的Spring Cloud Stream为主流的消息中间件产品RabbitMQ和Kafka提供了默认的Binder实现。
持久化发布/订阅支持
应用间通信遵照发布-订阅模型,消息通过共享主题进行广播。下图所示,显示了交互的Spring Cloud Stream 应用的典型布局。

在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的Topic主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的Topic主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,Topic可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。
消费组
由于发布-订阅模型使得共享主题的应用之间连接更简便,创建给定应用的不同实例来进行弹性扩张的能力也同样重要。如果存在多个应用实例,那么同一应用的不同实例便会成为相互竞争的消费者,其中应该只有一个实例处理给定消息。
Spring Cloud Stream通过消费组的概念给这种情况进行建模。每一个单独的消费者可以使用spring.cloud.stream.bindings.input.group属性来指定一个组名字。下图中展示的消费者们,这一属性被设置为spring.cloud.stream.bindings.input.group=hdfsWrite或者spring.cloud.stream.bindings.input.group=average。 

所有订阅给定目标的组都会收到发布消息的一个拷贝,但是每一个组内只有一个成员会收到该消息。默认情况下,当我们没有为应用指定消费组的时候,Spring Cloud Stream会为其分配一个独立的匿名消费组。所以,如果同一主题下所有的应用都没有指定消费组的时候,当有消息被发布之后,所有的应用都会对其进行消费,因为它们各自都属于一个独立的组中。 一般来说,将应用绑定到给定目标的时候,最好指定一个消费,并且每一个通道均需要独立设定一个消费组。 这样可以防止应用实例收到重复的消息。(除非存在重复收到的需求,如刷新所有实例的配置)。
持久性 
与Spring Cloud Stream中的可选应用模型一样,消费者组订阅是持久的。也就是说,一个绑定的实现确保组的订阅者是持久的,一旦组中至少有一个成员创建了订阅,这个组就会收到消息,即使组中所有的应用都被停止了,组仍然会收到消息。 
注:自然情况下,匿名订阅者是非持久化的。对于某些绑定实现(如rabbitmq),可以创建非持久化(non-durable)组订阅。 
消息分区
Spring Cloud Stream支持在给定应用程序的多个实例之间对数据进行分区。在分区方案中,每个topic均可以被构建为多个分区。一个或者多个生产者应用实例给多个消费者应用实例发送消息并确保相同特征的数据被同一消费者实例处理。 
Spring Cloud Stream为分区提供了通用的抽象实现,用来在消息中间件的上层实现分区处理,所以它对于消息中间件自身是否实现了消息分区并不关心,这使得Spring Cloud Stream为不具备分区功能的消息中间件也增加了分区功能扩展。(如具备分区特性的kafka或者不带分区的特性的rabbitmq)。 

分区是有状态处理中的一个关键概念,无论是性能还是一致性的原因,分区都是至关重要的,当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。。例如,在时间窗平均计算示例中,来自任何给定传感器的所有测量值都由相同应用程序实例处理是很重要的。
注:要设置分区处理方案,您必须配置数据生成和数据消耗两端。

示例:
1、工程结构说明;
2、构建消费者工程实例;
3、自定义Channel;
4、生产者另一种实现;

工程结构说明
1、主要构建如下两个模块:

其中receiver也就是我们常说的消费者,sender自然就是生产者。。本小节主要例子将集中在receiver工程实现。

2、在父pom.xml中添加如下主要依赖:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
</properties>
 
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.6.RELEASE</version>
</parent>
 
<dependencies>
 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
 
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
 
</dependencies>
 
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR5</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

构建消费者工程实例
stream-receiver工程实现如下:

1、在pom.xml中添加如下依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>


其等价使用spring-cloud-stream-binder-rabbit依赖。

2、新建一个启动类,作为一个普通的springboot项目:

package com.cloud.shf.stream;
@SpringBootApplication
public class ReceiverApp {
    public static void main(String[] args) {
        SpringApplication.run(ReceiverApp.class, args);
    }
}


3、SpringCloudStream已经预定义了Sink、Source、Processor,具体如下

{@link org.springframework.cloud.stream.messaging.Processor},
{@link org.springframework.cloud.stream.messaging.Source},
{@link org.springframework.cloud.stream.messaging.Sink}


采用Sink作为默认的消息订阅通道,定义如下:

package com.cloud.shf.stream.sink;
@EnableBinding(value = {Sink.class})
public class SinkReceiver {
    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        LOGGER.info("Received from default channel : {}", payload.toString());
    }
}


Note:

将@EnableBinding注解至spring应用的一个配置类中,即可将spring应用变成Spring Cloud Stream应用。
@EnableBinding注解本身就包含@Configuration注解,并且会触发Spring Cloud Stream基本配置;
将Sink.class作为@EnableBinding注解的参数,其指定了需要绑定的目标接口;
@StreamListener注解中描述具体监听的通道名称;

4、在application.properties文件中添加如下rabbitmq配置:

#configure rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=soul
spring.rabbitmq.password=123456

5、通过单元测试,实现消息的生产者:

package com.cloud.shf.stream;
@RunWith(SpringRunner.class)
@EnableBinding(value = {ReceiverAppTest.SinkSender.class})
public class ReceiverAppTest {
    @Autowired
    private SinkSender sinkSender;
 
    @Test
    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("produce a message to " + Sink.INPUT + " channel").build());
    }
 
    public interface SinkSender {
        @Output(Sink.INPUT)
        MessageChannel output();
    }
}


Note:
如上生产者定义主要参考org.springframework.cloud.stream.messaging.Source定义:

public interface Source {
   String OUTPUT = "output";
   @Output(Source.OUTPUT)
   MessageChannel output();
}


为了与接收消息的通道一致,故修改@Output注解参数为Sink.INPUT;

6、启动服务,可以看到如下log,声明了一个queue,并添加了订阅者:

7、查看rabbitmq控制台的Queus页签中存在上述队列:
8、并执行单元测试可以看到如下log信息:


小节,以上即实现了一个最简单的demo示例。

自定义Channel
上一小节采用默认预定义的Sink已经实现了消息的消费,并模拟Source定义了消息的生产者,同理,本小节将参考源代码SInk设计,实现自定义Channel的消息处理。

1、首先简单看下Sink的定义:

public interface Sink {
   String INPUT = "input";
   @Input(Sink.INPUT)
   SubscribableChannel input();
}


Note:
其通道名称定义为input;
@Input注解表示输入通道,当前应用将接收消息;对应的@Output注解表示输出通道,当前应用将发送消息;注解参数表示通道的名称,如果不提供名称,则当前注解方法名即作为通道名称;
必须以SubscribableChannel作为返回类型;

2、参考Sink的定义,自定义如下,通道名称为myInput:

package com.cloud.shf.stream.sink;
public interface MySink {
    String CHANNEL = "myInput";
    @Input(MySink.CHANNEL)
    SubscribableChannel input();
}
3、在@EnableBinding注解中加入当前定义的MySink接口:
package com.cloud.shf.stream.sink;
@EnableBinding(value = {Sink.class, MySink.class})
public class SinkReceiver {
    @StreamListener(MySink.CHANNEL)
    public void myReceive(Object payload) {
        LOGGER.info("Received from {} channel : {}", MySink.CHANNEL, payload.toString());
    }
}


Note:
必须在@EnableBinding注解加入新定义的消费消息接口,否则无法被注册;

4、添加单元测试如下:

package com.cloud.shf.stream;
@RunWith(SpringRunner.class)
@EnableBinding(value = {ReceiverAppTest.SinkSender.class,
        ReceiverAppTest.MySinkSender.class})
public class ReceiverAppTest {
    @Autowired
    private MySinkSender mySinkSender;
    @Test
    public void mySinkSenderTester() {
        mySinkSender.output().send(MessageBuilder.withPayload("produce a message to " + MySink.CHANNEL + " channel").build());
    }
    public interface MySinkSender {
        @Output(MySink.CHANNEL)
        MessageChannel output();
    }
}


Note:
必须在@EnableBinding注解加入新定义的生产消息接口,否则无法被注册;
返回类型必须为MessageChannel;

5、启动服务,可以看到如下log:

6、队列中新增消息队列:

7、执行单元测试后,可以看到接收到如下消息:


小节:以上即实现了自定义消息通道,比较简单。。

生产者另一种实现
上述两个小节中的生产者均通过直接注入定义的生产接口获取MessageChannel实例,然后发送消息,其实也可以直接注入MessageChannel实例来完成消息的发送,本小节将实现模拟。

1、在MySink继续添加如下两个通道定义:
package com.cloud.shf.stream.sink;
public interface MySink {
    String OUTPUT1_CHANNEL="OutPut-1";
    String OUTPUT2_CHANNEL="OutPut-2";
 
    @Input(OUTPUT1_CHANNEL)
    SubscribableChannel input1();
 
    @Input(OUTPUT2_CHANNEL)
    SubscribableChannel input2();
}

2、在SinkReceiver添加如下对两个通道的监听实现:

@StreamListener(MySink.OUTPUT1_CHANNEL)
public void myReceive1(Object payload) {
    LOGGER.info("Received from {} channel : {}", MySink.OUTPUT1_CHANNEL, payload.toString());
}
 
@StreamListener(MySink.OUTPUT2_CHANNEL)
public void myReceive2(Object payload) {
    LOGGER.info("Received from {} channel : {}", MySink.OUTPUT2_CHANNEL, payload.toString());
}

3、在单元测试中继续添加新增通道生产逻辑:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ReceiverAppTest.SinkSender.class,
        ReceiverAppTest.MySinkSender.class,
        ReceiverAppTest.MyOutputSource.class})
public class ReceiverAppTest {
    @Resource(name = MySink.OUTPUT1_CHANNEL)
    private MessageChannel send1;
 
    @Resource(name = MySink.OUTPUT2_CHANNEL)
    private MessageChannel send2;
 
    @Test
    public void myOutputSourceTester() {
        send1.send(MessageBuilder.withPayload("produce a message to " + MySink.OUTPUT1_CHANNEL + " channel").build());
        send2.send(MessageBuilder.withPayload("produce a message to " + MySink.OUTPUT2_CHANNEL + " channel").build());
    }
 
    public interface MyOutputSource {
        @Output(MySink.OUTPUT1_CHANNEL)
        MessageChannel output1();
 
        @Output(MySink.OUTPUT2_CHANNEL)
        MessageChannel output2();
    }
}


Note:
直接注入MessageChannel实例,并采用通道名称;

4、执行单元测试:
小节:通过注入生产接口然后获取MessageChannel实例,其实与直接注入MessageChannel是一样的,特别注意直接注入MessageChannel时,必须设定@Resource的name与生产接口定义中@Output中的名称保持一致。

示例二:

获取header信息
通过官方,可以看到有除了@Payload、@Headers之前还有一个@Header注解,其能够获取指定头信息,下面是一个简单的示例:

1、在监听方法中添加@Header注解应用,如下:

@StreamListener(value = MySink.USER_CHANNEL)
public void userReceive(@Payload User user, @Headers Map headers, @Header(name = "name") Object name) {
    LOGGER.info(headers.get("contentType").toString());
    LOGGER.info("name : {}", name.toString());
    LOGGER.info("Received from {} channel username: {}", MySink.USER_CHANNEL, user.getUsername());
}
2、单元测试修改如下,在Header中添加了一个name属性:
@Test
public void myUserSenderTester() {
    User user = new User().setUsername("shuaishuai").setAge(12);
    userSender.userChannelSender().send(MessageBuilder.withPayload(ReflectionToStringBuilder.toString(user, ToStringStyle.JSON_STYLE))
            .setHeader("name", "song").build());
}


3、执行单元测试:


小节:通过上述示例,可以看到@Payload注解内容为消息体,@Headers注解获取所有的Header头信息,@Header注解获取指定name的头信息。

多监听方法调度
在消费实例方,可以定义多个监听方法对同一通道进行监听处理,但其需要在某些指定的条件下方可触发。恰好@StreamListener注解提供了condition参数作为条件设定,其支持SpEL表达式,通过条件约束即可满足我们的需求。具体示例如下:

1、添加如下两个监听方法,其中userReceiveByHeader1方法监听头信息中flag为aa的消息,userReceiveByHeader2方法监听头信息中flag为bb的消息:

/*********************************实现多个监听者应用******************************/
@StreamListener(value = MySink.USER_CHANNEL, condition = "headers['flag']=='aa'")
public void userReceiveByHeader1(@Payload User user) {
    LOGGER.info("Received from {} channel : {} with head (flag:aa)", MySink.USER_CHANNEL, user.getUsername());
}
 
@StreamListener(value = MySink.USER_CHANNEL, condition = "headers['flag']=='bb'")
public void userReceiveByHeader2(@Payload User user) {
    LOGGER.info("Received from {} channel : {} with head (flag:bb)", MySink.USER_CHANNEL, user.getUsername());
}


2、在单元测试消息发送时添加flag头信息,如下值aa:

@Test
public void myUserSenderTester() {
    User user = new User().setUsername("shuaishuai").setAge(12);
    userSender.userChannelSender().send(MessageBuilder.withPayload(ReflectionToStringBuilder.toString(user, ToStringStyle.JSON_STYLE))
            .setHeader("name", "song")
            .setHeader("flag","aa")
            .build());
}


3、执行单元测试打印如下:


4、继续修改单元测试,设置flag值为bb:

@Test
public void myUserSenderTester() {
    User user = new User().setUsername("shuaishuai").setAge(12);
    userSender.userChannelSender().send(MessageBuilder.withPayload(ReflectionToStringBuilder.toString(user, ToStringStyle.JSON_STYLE))
            .setHeader("name", "song")
            .setHeader("flag","bb")
            .build());
}


5、执行单元测试打印如下:

 

参考相关文档

spring cloud stream 文档

spring cloud stream 项目

spring cloud stream 样例

更多详细参照:

https://blog.csdn.net/qq_32734365/article/details/81413218

正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
原文地址:https://www.cnblogs.com/candlia/p/11919911.html