SpringCloud学习(七):stream 消息驱动

菜鸟学渣接触spring cloud 系列...

公司也上微服务了,再不学习下就凉了,所以来踩坑吧...

版本:

  spring-boot:  2.0

  spring-cloud: Finchley.SR1

已有项目:

  [eureka-server]              # 注册中心   port 8761

  [eureka-client-one]       #  微服务1    port 8501

  [eureka-client-two]       #  微服务2    port 8502

  [eureka-client-turbine] #  断路监控   port 8503

  [eureka-client-zuul]      #  网关服务   port 8601

  [eureka-client-sleuth]      #  链路追踪 port 8602

能上图绝不BB

  

  spring-cloud-stream 支持RabbitMQ、Kafka 组件的消息系统,这里选RabbitMQ

  大致这样理解: 微服务ABCD(吃货)不断发(吃披萨的)消息到RabbitMQ(饿了吗),微服务F(卖披萨的)一直监听着RabbitMQ,收到ABCD的消息后,立马打包披萨送到ABCD家里,地址从消息里来的。

零、 安装RabbitMQ-server端

  这里使用stream-rabbitmq基于rabbitMQ实现,需要先在电脑安装RabbitMQ-server

  Windows 下安装RabbitMQ:

    1. 安装erlang 语言环境

    2. 安装RabbitMQ-server

    3. 启用plugin

一、stream 消息产生和接收

  这里就把产生和接收放到一个微服务里面了,不分开写了

  新建 [eureka-client-stream]  

  引入依赖   spring-cloud-stream、spring-cloud-starter-stream-rabbit

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.renzku</groupId>
    <artifactId>eureka-client-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>eureka-client-stream</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' -->
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>
View Code

  配置文件   application.yml

server:
  port: 8603

spring:
  application:
    name: eureka-client-stream
  cloud:
    stream:
      bindings:
        input:        # 接收
          destination:  eureka-client-stream-des    # exchange名称
        output:     # 产生
          destination:  eureka-client-stream-des    # input和output一致便可沟通
  rabbitmq:   # rabbitMQ-server 信息
    host: localhost 
    port: 5672
    username: guest
    password: guest

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

  启动类   EurekaClientStreamApplication.java

@SpringBootApplication
public class EurekaClientStreamApplication {

    public static void main(String[] args) {

        SpringApplication.run(EurekaClientStreamApplication.class, args);
    }
}

  产生消息的类   HelloStreamSource.java

@EnableBinding(Source.class)
public class HelloStreamSource {

    @Resource
    private MessageChannel output;

    public void sendTestData() {
        String s = "source msg";
        this.output.send(MessageBuilder.withPayload(s).build());  // 发出消息
    }
}

  接收消息的类   HelloStreamSink.java

@EnableBinding(Sink.class)
public class HelloStreamSink {

    @StreamListener(Sink.INPUT)
    public void input(String s){

        System.out.println("input:" + s);
    }
}

  Rest服务  HelloStream.java

@RestController
public class HelloStream {

    @Autowired
    private HelloStreamSource helloStreamSource;

    @RequestMapping("/stream")
    public String HelloStream(){
        // 发出消息
        helloStreamSource.sendTestData();
        return "hello stream";
    }
}

  目录结构

  

  启动 [eureka-client-stream]

  访问  http://localhost:8603/stream, rabbitMQ中Exchanges添加了 eureka-client-stream-des 

  

  控制台也打印了接收到的信息:

  

二、自定义消息通道

  新建通道接口类  CustomProcessor.java

/**
 * 自定义消息通道
 */
public interface CustomProcessor {
    String INPUT = "customInput";
    String OUTPUT = "customOutput";

    @Input(CustomProcessor.INPUT)
    MessageChannel customInput();

    @Output(CustomProcessor.OUTPUT)
    MessageChannel customOutput();
}

  配置文件  application.yml

server:
  port: 8603

spring:
  application:
    name: eureka-client-stream
  cloud:
    stream:
      bindings:
        input:        # 接收
          destination:  eureka-client-stream-des    # exchange名称
        output:     # 产生
          destination:  eureka-client-stream-des    # input和output一致便可沟通
        customInput:
          destination:  eureka-client-stream-cust-des
        customOutput:
          destination:  eureka-client-stream-cust-des  # 如果用上面output值,那上面input也会接收到这里发的消息
  rabbitmq:   # rabbitMQ-server 信息
    host: localhost 
    port: 5672
    username: guest
    password: guest

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

  新建消息产生类  HelloStreamCustomProcessorSource.java

@EnableBinding(CustomProcessor.class)
public class HelloStreamCustomProcessorSource {

    @Resource
    private MessageChannel customOutput;

    public void sendTestData() {
        String s = "custom source msg";
        this.customOutput.send(MessageBuilder.withPayload(s).build());
    }
}

  新建消息接收类   HelloStreamCustomProcessorSink.java

@EnableBinding(CustomProcessor.class)
public class HelloStreamCustomProcessorSink {

    @StreamListener(CustomProcessor.INPUT)
    public void input(String s){

        System.out.println("custom input:" + s);
    }
}

  Rest服务   HelloStream.java

@RestController
public class HelloStream {

    @Autowired
    private HelloStreamSource helloStreamSource;

    @Autowired
    private HelloStreamCustomProcessorSource helloStreamCustomProcessorSource;

    @RequestMapping("/stream")
    public String HelloStream(){
        helloStreamSource.sendTestData();
        return "hello stream";
    }

    @RequestMapping("/stream/cust")
    public String HelloStreamCust(){
        helloStreamCustomProcessorSource.sendTestData();
        return "hello stream cust";
    }
}

  启动并访问  http://localhost:8603/stream/cust , rabbitMQ中Exchanges添加了 eureka-client-stream-cust-des

  

  控制台也打印了新通道接收到的信息:

  

这个可以用来和spring cloud config 结合,更新微服务配置信息

spring-cloud-bus.jar!orgspringframworkcloudusSpringCloudBusClient.java  了解一下

原文地址:https://www.cnblogs.com/renzku/p/9615449.html