spring cloud stream集成rabbitmq

pom添加依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

application.yml配置

# Spring 相关
spring:
# rabbitmq
  rabbitmq:
    host: 192.168.3.107
    port: 5672
    username: admin
    password: 123456

定义输入通道

package com.zh.common.api.rabbitmq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * @Auther: suruozhong
 * @Date: 2019/9/17 15:45
 * @Description:
 */
public interface OrderChannel {
    //定义通道的名称
    String saveOrder = "saveOrder";
    //定义为输入通道
    @Input(saveOrder)
    SubscribableChannel saveOrder();

}

定义输出通道

package com.zh.common.api.rabbitmq;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @Auther: suruozhong
 * @Date: 2019/9/17 15:51
 * @Description:
 */
public interface OrderOutputChannel {

    //定义通道的名称
    String saveOrder = "saveOrder";
    //定义为输入通道
    @Output(saveOrder)
    MessageChannel saveOrder();

}

生产端

在对应的模块绑定输入通道

/**
 * @Auther: suruozhong
 * @Date: 2019/7/24 15:51
 * @Description:
 */
@SpringCloudApplication
@EnableAutoConfiguration(exclude = {
        SecurityAutoConfiguration.class
})
//通过绑定器对OderChannel通道进行绑定
@EnableBinding(OrderOutputChannel.class)
public class HousekeeperFrontApplication {

    public static void main(String[] args) {
        SpringApplication.run(HousekeeperFrontApplication.class,args);
    }
}

发送消息

@RestController
@RequestMapping("/sysBanner")
public class SysBannerController {

  @Resource
  private OrderOutputChannel orderOutputChannel;


  @RequestMapping(value = "/list")
  public void listData(String position) {
    orderOutputChannel.saveOrder().send(MessageBuilder.withPayload("fff").build());
  }

}

消费端

在对应的模块绑定输出通道

/**
 * @author 
 * @date 2018年06月21日
 * 用户统一管理系统
 */
@SpringCloudApplication
@EnableScheduling //开启定时任务
//通过绑定器对OderChannel通道进行绑定
@EnableBinding(OrderChannel.class)
public class HousekeeperAdminApplication {
    public static void main(String[] args) {
        SpringApplication.run(HousekeeperAdminApplication.class, args);
    }

}

绑定监听消息

@Service("sysBannerService")
@Transactional
public class SysBannerServiceImpl extends ServiceImpl<SysBannerMapper, SysBanner> implements SysBannerService {

    //对saveOrder的消息监听处理
    @StreamListener(OrderChannel.saveOrder)
    private void receiver(Object message){
        System.out.println(message.toString());
    }
}

分组加持久化配置

在生产端的application.yml

spring:
  cloud:   
#    spring cloud strem
    stream:
      bindings:
        saveOrder: 输出通道名称
          group: saveOrder  分组名称
原文地址:https://www.cnblogs.com/suruozhong/p/11539992.html