背景
消息中间件有多种,rabbitmq,rocketmq,activemq,kafka等。
不同的消息中间件具体细节不一样。那么有没有一种新的技术诞生,让我们不再关注具体MQ细节,我们只需要用一种适配绑定的方式,自动给我们在各种MQ内切换。
一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。Spring Cloud Stream 因此诞生。
官方定义 Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序使用inputs或者outputs来与springcloud stream中binder交互。
通过我们配置来bingding(绑定),而stream的binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与springcloud stream交互就可以方便使用消息驱动方式。
SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用发布-订阅、消费组、分区三个核心概念。
目前仅支持RabbitMQ和Kafka。
Stream处理架构:
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件之间的隔离。
stream标准流程套路:
Binder:很方便的连接中间件,屏蔽差异。
channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。
source和sink:简单理解为消息的输入输出。
编码API和注解:
Stream消息生产者
pom依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
yml配置
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
bindings:
output:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.10.132
port: 5672
username: guest
password: guest
eureka:
client:
#是否将自己注册到Eureka Server 默认为true
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true,单节点无所谓,集群必须设置true才能配合ribbon做负载均衡
fetch-registry: true
service-url:
#设置eureka server交互的地址查询服务和注册服务都需要依赖这个地址
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: send-8801.com
prefer-ip-address: true
发送消息的接口:
public interface IMessageProvider {
public String send();
}
发送消息的实现:
@EnableBinding(Source.class)//定义消息的推送管道
public class MessageProvider implements IMessageProvider {
@Autowired
private MessageChannel output;
@Override
public String send() {
String serial = IdUtil.simpleUUID();
System.out.println(serial+"============");
output.send(MessageBuilder.withPayload(serial).build());
return serial;
}
}
controller
@RestController
public class SendController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping("sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
调用接口,观察效果
Stream消息消费者
配置yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.10.132
port: 5672
username: guest
password: guest
eureka:
client:
#是否将自己注册到Eureka Server 默认为true
register-with-eureka: true
#是否从EurekaServer抓取已有的注册信息,默认为true,单节点无所谓,集群必须设置true才能配合ribbon做负载均衡
fetch-registry: true
service-url:
#设置eureka server交互的地址查询服务和注册服务都需要依赖这个地址
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802.com
prefer-ip-address: true
接收消息:
@EnableBinding(Sink.class)
@RestController
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(value = Sink.INPUT)
public void input(Message<String> message){
System.out.println("消息:"+message.getPayload()+"serverPort:"+serverPort);
}
}
stream重复消费
下默认配置中,多个消费者存在,会存在重复消费问题
原因:默认分组group是不同的,组流水号不一样,被认为是不同组,可以消费,所以要自定义配置分组。
yml配置:
cloud:
stream:
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: wen.jie
通过配置后,两个消费者被分配到一组,就不存在重复消费的问题。