第十一章 SpringCloud之stream

####################发送消息(producer)#####################

1、添加依赖,在pom.xml文件

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!--            <version>2.0.1.RELEASE</version>-->
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

2、创建管道消息

package com.test.eurekaclientcomsumerstream.config;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

//创建发送消息管道
public interface SendMessageInterface {
    // 创建一个输出管道,用于发送消息
    @Output("mymsg")
    SubscribableChannel sendMsg();
}

3、启动类添加注解

package com.test.eurekaclientcomsumerstream;

import com.test.eurekaclientcomsumerstream.config.SendMessageInterface;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding({SendMessageInterface.class}) //springcloud stream 生产者
public class EurekaClientComsumerStreamProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaClientComsumerStreamProducerApplication.class, args);
    }

}

4、使用消息管道发送消息

package com.test.eurekaclientcomsumerstream.controller;

import com.test.eurekaclientcomsumerstream.config.SendMessageInterface;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class SendMsgController {
    @Autowired
    private SendMessageInterface sendMessageInterface;

    /**
     * 发送消息
     * @return
     */
    @RequestMapping("/sendMsg")
    public String sendMsg() {
        String msg = UUID.randomUUID().toString();
        System.out.println("生产者发送内容msg:" + msg);
        Message build = MessageBuilder.withPayload(msg.getBytes()).build();
        sendMessageInterface.sendMsg().send(build);
        return "success";
    }

}

5、application.yml文件添加配置

server:
  port: 7016
user:
  httpAddress: http://eureka-provider/getUser/  #使用虚拟主机名

spring:
  application:
    name: eureka-client-stream-producer
  thymeleaf:
    suffix:
  rabbitmq:
    publisher-returns: true
    host: 132.232.44.82
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 1
        retry:
          enabled: true
  cloud:
    stream:
      bindings:
        mymsg:
          destination: test
          group: stream


eureka:
  instance:
    hostname: localhost
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.application.instance_id:${server.port}}
  client:
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:8761/eureka

#  cloud:
#    stream:
#      bindings:
#        mymsg: ###指定 管道名称
#          #指定该应用实例属于 stream 消费组
#          group: stream
#  spring.rabbitmq.publisher-returns=true
#  #采用手动应答
#  #spring.rabbitmq.listener.simple.acknowledge-mode=manual
#  #指定最小的消费者数量
#  spring.rabbitmq.listener.simple.concurrency=1
#  #指定最大的消费者数量
#  spring.rabbitmq.listener.simple.max-concurrency=1
#  #是否支持重试
#  spring.rabbitmq.listener.simple.retry.enabled=true 

####################接受消息(consumer)#####################

1、在pom.xml添加注解

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <!--            <version>2.0.1.RELEASE</version>-->
        </dependency>

2、创建接受消息管道

package com.test.eurekaclientcomsumerstreamconsumer.config;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

//创建接受消息管道
public interface ReadMsgInterface {

    // 从管道中获取消息
    @Input("mymsg")
    SubscribableChannel redMsg();
}

3、在启动类添加注解

package com.test.eurekaclientcomsumerstreamconsumer;

import com.test.eurekaclientcomsumerstreamconsumer.config.ReadMsgInterface;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding({ReadMsgInterface.class}) //springcloud stream 消费者
public class EurekaClientComsumerStreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaClientComsumerStreamConsumerApplication.class, args);
    }

}

4、使用管道接受消息

package com.test.eurekaclientcomsumerstreamconsumer.controller;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

@Component
public class ReadMsgController {

    /**
     * 读取消息
     * @param msg 消息体
     */
    @StreamListener("mymsg")
    public void listener(String msg) {
        System.out.println("消费者获取生产消息-----1111:" + msg);
    }
}

5、在application.yml文件添加配置

server:
  port: 7018
user:
  httpAddress: http://eureka-provider/getUser/  #使用虚拟主机名

spring:
  application:
    name: eureka-client-stream-consumer
  thymeleaf:
    suffix:
  rabbitmq:
    publisher-returns: true
    host: 132.232.44.82
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 1 #设置只能被消费一次,防止重复消费,分布式时,轮询被消费
        retry:
          enabled: true
  cloud:
    stream:
      bindings:
        mymsg:
          destination: test #目的地
          group: stream #群组

eureka:
  instance:
    hostname: localhost
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.application.instance_id:${server.port}}
  client:
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:8761/eureka

#  cloud:
#    stream:
#      bindings:
#        mymsg: ###指定 管道名称
#          #指定该应用实例属于 stream 消费组
#          group: stream
#  spring.rabbitmq.publisher-returns=true
#  #采用手动应答
#  #spring.rabbitmq.listener.simple.acknowledge-mode=manual
#  #指定最小的消费者数量
#  spring.rabbitmq.listener.simple.concurrency=1
#  #指定最大的消费者数量
#  spring.rabbitmq.listener.simple.max-concurrency=1
#  #是否支持重试
#  spring.rabbitmq.listener.simple.retry.enabled=true 

详细代码github地址https://github.com/812406210/springCloud.git

参考博客:https://www.jianshu.com/p/404fc32122d1

          https://www.cnblogs.com/a1304908180/p/10684818.html

原文地址:https://www.cnblogs.com/ywjfx/p/12100307.html