03.springboot 整合RabbitMQ

1.引入依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置生成的消息队列

spring:
 rabbitmq:
    host: 47.113.120.XX
    port: 5672
    password: XXXX
    username: XXXX
    virtual-host: XXX

# rabbitmq 初始化配置
rabbit-init:
  list:
    - {exchange: "cs.user.topic",queues: [user.permission] , bindingKey: '#.permission', type: topic }

3.配置类

@ConfigurationProperties("rabbit-init")
@Data
public class RabbitMQInitProperty {
    private List<RabbitEntity> list = new ArrayList<>();
}

4.RabbitMqConfig类

@Configuration
@Component
@Slf4j
public class RabbitMQConfig implements RabbitListenerConfigurer {

    /**
     * 回调函数: confirm确认
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            if(!ack){
                //可以进行日志记录、异常处理、补偿处理等
                System.err.println("异常处理...."+cause);
            }else {
                //更新数据库,可靠性投递机制
            }
        }
    };
    /**
     * 回调函数: return返回
     */
    public final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText,
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    /**
     * rabbitmq 初始配置
     */
    @Autowired
    private RabbitMQInitProperty property ;
    /**
     *
     */
    @Autowired
    private ConnectionFactory connectionFactory;
    /**
     * 增加rabbitTemplate回调函数
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        return rabbitTemplate;
    }

    /**
     *
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(){
        return new RabbitAdmin(rabbitTemplate());
    }

    /**
     * 初始化消息队列
     * @param rabbitAdmin
     * @return
     */
    @Bean
    public RabbitMQInitProperty getRabbitMQProperty(RabbitAdmin rabbitAdmin){
        List<RabbitEntity> list = property.getList();
        if(StringUtils.isEmpty(list)) {
            return null ;
        }
        list.stream().forEach(entity -> {
            List<String> queues = entity.getQueues();
            String binding = entity.getBindingKey();
            String exchange = entity.getExchange();
            String type = !StringUtils.isEmpty(entity.getType())? entity.getType() : ExchangeTypes.DIRECT;
            if(StringUtils.isEmpty(queues) || StringUtils.isEmpty(binding)
                        || StringUtils.isEmpty(exchange)
                        || StringUtils.isEmpty(type)){
               return;
            }
            Exchange exchangeTempt= new ExchangeBuilder(exchange, type).durable(true).build();
            rabbitAdmin.declareExchange(exchangeTempt);
            for(String str : queues){
                Queue queue = QueueBuilder.durable(str).build();
                rabbitAdmin.declareQueue(queue);
                Binding bind = BindingBuilder.bind(queue).to(exchangeTempt).with(binding).noargs();
                rabbitAdmin.declareBinding(bind);
            }
        });
        return this.property ;
    }

    /**
     *  对象数据格式化
     * @return
     */
    @Bean
    public MessageConverter messagetConverter() {
        MessageConverter converter = new Jackson2JsonMessageConverter();
        return converter;
    }


    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

}

5.RabbitMQ 发送进行封装

public class RabbitSender {

    /**
     * 自动注入RabbitTemplate模板类
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     *
     * @return
     */
    private CorrelationData getCorrelation(){
        return new CorrelationData(UUID.randomUUID().toString().replace("-", ""));
    }

    /**
     *
     * @param exchange
     * @param routingKey
     * @param message
     */
    public  void convertAndSend(String exchange,String routingKey, Object message){
        CorrelationData correlation = getCorrelation();
        log.info("correlation:{},exchange:{},routekey:{},params:{}",correlation.toString(),exchange,
                routingKey,message.toString());
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlation);
    }

    /**
     *
     * @param entity
     */
    public void convertAndSend(RabbitSenderEntity entity) {
        CorrelationData correlation = getCorrelation();
        log.info("correlation:{},exchange:{},routekey:{},params:{}",correlation.toString(),entity.getExchange(),
                 entity.getRouteKey(),entity.getParams());
        rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRouteKey(), entity.getParams(), correlation);
    }
}



6.测试使用

 @RequestMapping("/setUserPermission")
    public ResultObj setUserPermission(@RequestBody UserInfo user){
        try {
            Assert.notNull(user);
            RabbitSenderEntity entity = RabbitSenderEntity.builder()
                                              .exchange("cs.user.topic")
                                              .routeKey("user.permission")
                                              .params(JsonMapperUtil.toString(user)).build();
            sender.convertAndSend(entity);
        } catch (Exception e) {
            log.error(e.getMessage());
            return  ResultObj.failObj(e.getMessage());
        }
        return ResultObj.successObj("权限设置成功");
    }
 @RabbitListener(queues="user.permission")
    public void setUserPermission(Message message, Channel channel) throws IOException {
        try {
            UserInfo user = RabbitUtil.getMessageBody(message, UserInfo.class);
            userInfoService.updateById(user);
        }  catch (IOException e) {
            log.error("消费方法{},爆出错误信息:{}","setUserPermission",e.getMessage());
        } finally {
            //告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }


原文地址:https://www.cnblogs.com/perferect/p/13129804.html