SpringCloud Stream 使用

第一步:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

第二步 定义接口:

package com.example.order.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * @Title: StreamClient
 * @ProjectName order
 * @date 2019/11/2015:06
 */
public interface StreamClient {
    String INPUT = "input";
    String OUTPUT = "output";

    String INPUT2 = "input2";
    String OUTPUT2 = "output2";

    @Input(StreamClient.INPUT)
    SubscribableChannel input();

    @Output(StreamClient.OUTPUT)
    MessageChannel output();

    @Input(StreamClient.INPUT2)
    SubscribableChannel input2();

    @Output(StreamClient.OUTPUT2)
    MessageChannel output2();
}

  

第三步 定义接受方:

package com.example.order.message;

import com.example.order.dto.OrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * 消息接收类
 * @Title: StreamReceiver
 * @ProjectName order
 * @date 2019/11/2015:09
 */
@Component
@EnableBinding(StreamClient.class)  // 定义的接口类
@Slf4j
public class StreamReceiver {
    // 接收字符串
    /*@StreamListener(value = StreamClient.INPUT)
    public void process(Object message) {
        log.info("StreamReceiver: {}", message);
    }*/

   /**
     * 接收对象, 接收成功后在回应一下
     * @param message
     */
    @StreamListener(value = StreamClient.INPUT)
    @SendTo(value = StreamClient.INPUT2)
    public String process(OrderDTO message) {
        log.info("StreamReceiver: {}", message);
        return "ok";
    }

    // 回应接收
    @StreamListener(value = StreamClient.INPUT2)
    public void process2(String message) {
        log.info("INPUT2, {}", message);
    }

}

第四步 定义发送方:

package com.example.order.controller;

import com.example.order.dto.OrderDTO;
import com.example.order.message.StreamClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * @Title: SendMessageController
 * @ProjectName order
 * @date 2019/11/2015:11
 */
@RestController
public class SendMessageController {

    @Autowired
    private StreamClient streamClient;

    // 发送字符串
    /*@GetMapping("/sendMessage")
    public void sendMessage() {
        streamClient.output().send(MessageBuilder.withPayload("now " + new Date()).build());
    }*/

    /**
     * 发送对象
     */
    @GetMapping("/sendMessage")
    public void sendMessage() {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("123123123");
        streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
    }

}

第五步 配置yml文件

spring:
  application:
    name: ORDER
  cloud:
    config:
      discovery:
        enabled: true
        service-id: CONFIG
      profile: dev
    stream:
      bindings:
        input:
          destination: mymessage # 指定mq
          group: order  # 所属组
        output:
          destination: mymessage
          group: order  # 所属组
          content-type: application/json  # 内容类型为json

        input2:
          destination: mymessage2 # 指定mq
          group: order  # 所属组
        output2:
          destination: mymessage2
          group: order  # 所属组
          content-type: application/json  # 内容类型为json
eureka:
   client:
     service-url:
       defaultZone: http://10.10.10.103:8761/eureka/

 

原文地址:https://www.cnblogs.com/412013cl/p/11898513.html