第一步:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
第二步 定义接口:
package com.example.order.message; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * @Title: StreamClient * @ProjectName order * @date 2019/11/2015:06 */ public interface StreamClient { String INPUT = "input"; String OUTPUT = "output"; String INPUT2 = "input2"; String OUTPUT2 = "output2"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.OUTPUT) MessageChannel output(); @Input(StreamClient.INPUT2) SubscribableChannel input2(); @Output(StreamClient.OUTPUT2) MessageChannel output2(); }
第三步 定义接受方:
package com.example.order.message; import com.example.order.dto.OrderDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; /** * 消息接收类 * @Title: StreamReceiver * @ProjectName order * @date 2019/11/2015:09 */ @Component @EnableBinding(StreamClient.class) // 定义的接口类 @Slf4j public class StreamReceiver { // 接收字符串 /*@StreamListener(value = StreamClient.INPUT) public void process(Object message) { log.info("StreamReceiver: {}", message); }*/ /** * 接收对象, 接收成功后在回应一下 * @param message */ @StreamListener(value = StreamClient.INPUT) @SendTo(value = StreamClient.INPUT2) public String process(OrderDTO message) { log.info("StreamReceiver: {}", message); return "ok"; } // 回应接收 @StreamListener(value = StreamClient.INPUT2) public void process2(String message) { log.info("INPUT2, {}", message); } }
第四步 定义发送方:
package com.example.order.controller; import com.example.order.dto.OrderDTO; import com.example.order.message.StreamClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * @Title: SendMessageController * @ProjectName order * @date 2019/11/2015:11 */ @RestController public class SendMessageController { @Autowired private StreamClient streamClient; // 发送字符串 /*@GetMapping("/sendMessage") public void sendMessage() { streamClient.output().send(MessageBuilder.withPayload("now " + new Date()).build()); }*/ /** * 发送对象 */ @GetMapping("/sendMessage") public void sendMessage() { OrderDTO orderDTO = new OrderDTO(); orderDTO.setOrderId("123123123"); streamClient.output().send(MessageBuilder.withPayload(orderDTO).build()); } }
第五步 配置yml文件
spring: application: name: ORDER cloud: config: discovery: enabled: true service-id: CONFIG profile: dev stream: bindings: input: destination: mymessage # 指定mq group: order # 所属组 output: destination: mymessage group: order # 所属组 content-type: application/json # 内容类型为json input2: destination: mymessage2 # 指定mq group: order # 所属组 output2: destination: mymessage2 group: order # 所属组 content-type: application/json # 内容类型为json eureka: client: service-url: defaultZone: http://10.10.10.103:8761/eureka/