Rabbit 基于cloud 的配置使用结构流程

RabbitMq 基于 cloud 配置使用结构图

RabbitMq 基于 cloud 配置使用流程

1.在application.yml中配置RabbitMq

输出/输入>>>连接配置

spring:
  rabbitmq:
    addresses: 127.0.0.1
    username: rabbitMq
    password: admin

输出>推送使用配置

cloud:
  stream:
    bindings:
      userOutput:
        destination: UserSubscription 
        group: user-service
    binders:  #配置绑定器
      defaultRabbit:
        type: rabbit

输入>监听使用配置

cloud:
  stream:
    bindings:
      input: #内置的获取消息的通道 , 从topcheer-default中获取消息
        destination: topcheer-default
      output: #指定消息发送的目的地,在rabbitMq中,发送到一个topcheer-default的exchange中
        destination: topcheer-default
      
    userInput:
        destination: UserSubscription
        group: user-service
    #          producer:
    #            partition-key-expression: payload  #分区关键字   对象中的id,对象
    #            partition-count: 2  #分区大小
    binders:  #配置绑定器
      defaultRabbit:
        type: rabbit

2.Mq推送、监听自定义通道

输出>推送通道

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @Author: 马家立
 * @Date: 2020/10/13 11:56
 * @Description: TODO Mq 输出>推送自定义消息通道
 */
public interface MyProcessor {

    // 用户推送消息通道
    String USEROUTPUT="userOutput";
    @Output(USEROUTPUT)
    MessageChannel userOutput();

}
View Code

输入>监听通道

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;

/**
 * @Author: 马家立
 * @Date: 2020/10/13 11:56
 * @Description: TODO Mq 输入>监听自定义消息通道
 */
public interface MyProcessor {

    // 用户监听消息通道
    String USERINPUT="userInput";
    @Input(USERINPUT)
    MessageChannel userInput();

}
View Code 

3.Mq推送、监听使用

输出>推送使用

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;

/**
 * @Author: 马家立
 * @Date: 2020/10/16 11:38
 * @Description: TODO MQ推送
 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
    /**
    * 用户推送消息通道
    */
    @Resource
    @Qualifier(value=MyProcessor.USEROUTPUT)
    private MessageChannel userOutput;

    /**
    * @Author: 马家立
    * @Date: 2020/10/16 11:38
    * @Description: TODO MQ推送
    */
    public void send(Map<String,Object> map) {
        map.put("id","123");
        map.put("param","张三");
        userOutput.send(MessageBuilder.withPayload(map).build());
    }

}
View Code

输入>监听使用

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

import java.util.Map;
/**
* @Author: 马家立
* @Date: 2020/10/16 11:43
* @Description: TODO 消息监听器
*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {

    /**
    * @Author: 马家立
    * @Date: 2020/10/16 11:43
    * @Description: TODO MQ监听消费
    */
    @StreamListener(MyProcessor.USERINPUT)
    public void userInput(Map<String,Object> map){
        String id = (String) map.get("id");
        String param = (String) map.get("param");
        /**
        * @Description: TODO 以下做消费处理
         * .............................
        */
    }

}
View Code
原文地址:https://www.cnblogs.com/mjtabu/p/13825419.html