Spring Cloud Stream

 Spring Cloud Stream是Spring Cloud的组件之一,是一个为微服务应用构建消息驱动能力的框架。

1、导入引用

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

  

2、定义接口

public interface StreamClient {

    String INPUT = "input";
    String OUTPUT = "output";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();
}

  

  

3、定义消息的接收

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {

    @StreamListener(StreamClient.INPUT)
    @SendTo(StreamClient.OUTPUT)
    public Object processInput(String message){
        log.info("Input StreamReceiver:{}", message );
        return  message;
    }

    @StreamListener(StreamClient.OUTPUT)
    public void processOutPut(String message){
        log.info("Output StreamReceiver:{}", message );
    }

    
}

  

4、定义消息的发送

@RestController
public class SendMessageController {

    @Autowired
    private StreamClient streamClient;

    @GetMapping("/sendMessage")
    public void process(){
        String msg = "hello world";
        streamClient.output().send(MessageBuilder.withPayload(msg).build());
    }


}

  

 5. 结果

 6、发送对象

1) 发送者

   /**
     * 发送对象
     */
    @GetMapping("/sendMessage2")
    public void process2(){
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("123");
        streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
    }

  

2)接收者

  @StreamListener(StreamClient.OUTPUT)
    public void processOutPut(OrderDTO message){
        log.info("Output StreamReceiver:{}", message.toString() );
    }

  

7、消息接收到后,再回复消息。使用SendTo

    @StreamListener(StreamClient.INPUT)
    @SendTo(StreamClient.OUTPUT) //当Input接收到消息后,回复消息给Output
    public Object processInput(String message){
        log.info("Input StreamReceiver:{}", message );
        return  message;
    }

  

原文地址:https://www.cnblogs.com/linlf03/p/10374915.html