SpringBoot整合RabbitMQ

在linux安装rabbitmq

 

依赖

依赖

    <dependency> 

       <groupId>org.springframework.boot</groupId> 

       <artifactId>spring-boot-starter-amqp</artifactId> 

    </dependency>

 

 

配置

配置

#rabbitmq

spring.rabbitmq.host=10.110.3.62

spring.rabbitmq.port=5672

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

spring.rabbitmq.virtual-host=/

#消费者

spring.rabbitmq.listener.simple.concurrency= 10

spring.rabbitmq.listener.simple.max-concurrency= 10

#一次取的消费者数量

spring.rabbitmq.listener.simple.prefetch= 1

#消费者自动启动

spring.rabbitmq.listener.simple.auto-startup=true

#消费失败后重新消费

spring.rabbitmq.listener.simple.default-requeue-rejected= true

#发布后重试

spring.rabbitmq.template.retry.enabled=true

spring.rabbitmq.template.retry.initial-interval=1000

spring.rabbitmq.template.retry.max-attempts=3

spring.rabbitmq.template.retry.max-interval=10000

#每隔多久进行重试

spring.rabbitmq.template.retry.multiplier=1.0

 

 

 

提供者

提供者-MQSender

package com.cxl.shop.rabbitmq;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import com.cxl.shop.redis.RedisService;

 

@Service

public class MQSender {

 

    private static Logger log = LoggerFactory.getLogger(MQSender.class);

   

    @Autowired

    AmqpTemplate amqpTemplate ;

   

    public void sendMiaoshaMessage(MiaoshaMessage mm) {

       String msg = RedisService.beanToString(mm);

       log.info("send message:"+msg);

       amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);

    }

   

//  public void send(Object message) {

//     String msg = RedisService.beanToString(message);

//     log.info("send message:"+msg);

//     amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);

//  }

// 

//  public void sendTopic(Object message) {

//     String msg = RedisService.beanToString(message);

//     log.info("send topic message:"+msg);

//     amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");

//     amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");

//  }

// 

//  public void sendFanout(Object message) {

//     String msg = RedisService.beanToString(message);

//     log.info("send fanout message:"+msg);

//     amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);

//  }

// 

//  public void sendHeader(Object message) {

//     String msg = RedisService.beanToString(message);

//     log.info("send fanout message:"+msg);

//     MessageProperties properties = new MessageProperties();

//     properties.setHeader("header1", "value1");

//     properties.setHeader("header2", "value2");

//     Message obj = new Message(msg.getBytes(), properties);

//     amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);

//  }

 

   

   

}

 

 

 

消费者

消息消费者

package com.cxl.shop.rabbitmq;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

 

import com.cxl.shop.domain.MiaoshaOrder;

import com.cxl.shop.domain.MiaoshaUser;

import com.cxl.shop.redis.RedisService;

import com.cxl.shop.service.GoodsService;

import com.cxl.shop.service.MiaoshaService;

import com.cxl.shop.service.OrderService;

import com.cxl.shop.vo.GoodsVo;

 

@Service

public class MQReceiver {

 

        private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

       

        @Autowired

        RedisService redisService;

       

        @Autowired

        GoodsService goodsService;

       

        @Autowired

        OrderService orderService;

       

        @Autowired

        MiaoshaService miaoshaService;

       

        @RabbitListener(queues=MQConfig.MIAOSHA_QUEUE)

        public void receive(String message) {

            log.info("receive message:"+message);

            MiaoshaMessage mm  = RedisService.stringToBean(message, MiaoshaMessage.class);

            MiaoshaUser user = mm.getUser();

            long goodsId = mm.getGoodsId();

           

            GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);

        int stock = goods.getStockCount();

        if(stock <= 0) {

             return;

        }

        //判断是否已经秒杀到了

        MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);

        if(order != null) {

             return;

        }

        //减库存 下订单 写入秒杀订单

        miaoshaService.miaosha(user, goods);

        }

   

//      @RabbitListener(queues=MQConfig.QUEUE)

//      public void receive(String message) {

//          log.info("receive message:"+message);

//      }

//     

//      @RabbitListener(queues=MQConfig.TOPIC_QUEUE1)

//      public void receiveTopic1(String message) {

//          log.info(" topic  queue1 message:"+message);

//      }

//     

//      @RabbitListener(queues=MQConfig.TOPIC_QUEUE2)

//      public void receiveTopic2(String message) {

//          log.info(" topic  queue2 message:"+message);

//      }

//     

//      @RabbitListener(queues=MQConfig.HEADER_QUEUE)

//      public void receiveHeaderQueue(byte[] message) {

//          log.info(" header  queue message:"+new String(message));

//      }

//     

       

}

 

 

 

配置类

MQ的配置类 MQconfig

package com.cxl.shop.rabbitmq;

 

import java.util.HashMap;

import java.util.Map;

 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.HeadersExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

 

@Configuration

public class MQConfig {

   

    public static final String MIAOSHA_QUEUE = "miaosha.queue";

    public static final String QUEUE = "queue";

    public static final String TOPIC_QUEUE1 = "topic.queue1";

    public static final String TOPIC_QUEUE2 = "topic.queue2";

    public static final String HEADER_QUEUE = "header.queue";

    public static final String TOPIC_EXCHANGE = "topicExchage";

    public static final String FANOUT_EXCHANGE = "fanoutxchage";

    public static final String HEADERS_EXCHANGE = "headersExchage";

   

    /**

     * Direct模式 交换机Exchange

     * */

    @Bean

    public Queue queue() {

       return new Queue(QUEUE, true);

    }

   

    /**

     * Topic模式 交换机Exchange

     * */

    @Bean

    public Queue topicQueue1() {

       return new Queue(TOPIC_QUEUE1, true);

    }

    @Bean

    public Queue topicQueue2() {

       return new Queue(TOPIC_QUEUE2, true);

    }

    @Bean

    public TopicExchange topicExchage(){

       return new TopicExchange(TOPIC_EXCHANGE);

    }

    @Bean

    public Binding topicBinding1() {

       return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");

    }

    @Bean

    public Binding topicBinding2() {

       return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");

    }

    /**

     * Fanout模式 交换机Exchange

     * */

    @Bean

    public FanoutExchange fanoutExchage(){

       return new FanoutExchange(FANOUT_EXCHANGE);

    }

    @Bean

    public Binding FanoutBinding1() {

       return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());

    }

    @Bean

    public Binding FanoutBinding2() {

       return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());

    }

    /**

     * Header模式 交换机Exchange

     * */

    @Bean

    public HeadersExchange headersExchage(){

       return new HeadersExchange(HEADERS_EXCHANGE);

    }

    @Bean

    public Queue headerQueue1() {

       return new Queue(HEADER_QUEUE, true);

    }

    @Bean

    public Binding headerBinding() {

       Map<String, Object> map = new HashMap<String, Object>();

       map.put("header1", "value1");

       map.put("header2", "value2");

       return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();

    }

   

   

}

 

 

 

辅助类

工具类

public static <T> String beanToString(T value) {

       if(value == null) {

           return null;

       }

       Class<?> clazz = value.getClass();

       if(clazz == int.class || clazz == Integer.class) {

            return ""+value;

       }else if(clazz == String.class) {

            return (String)value;

       }else if(clazz == long.class || clazz == Long.class) {

           return ""+value;

       }else {

           return JSON.toJSONString(value);

       }

    }

 

    @SuppressWarnings("unchecked")

    public static <T> T stringToBean(String str, Class<T> clazz) {

       if(str == null || str.length() <= 0 || clazz == null) {

            return null;

       }

       if(clazz == int.class || clazz == Integer.class) {

            return (T)Integer.valueOf(str);

       }else if(clazz == String.class) {

            return (T)str;

       }else if(clazz == long.class || clazz == Long.class) {

           return  (T)Long.valueOf(str);

       }else {

           return JSON.toJavaObject(JSON.parseObject(str), clazz);

       }

    }

 

 

Contoller

@Autowired

    MQSender sender;

//  @RequestMapping("/mq/header")

//    @ResponseBody

//    public Result<String> header() {

//     sender.sendHeader("hello,imooc");

//        return Result.success("Helloworld");

//    }

// 

//  @RequestMapping("/mq/fanout")

//    @ResponseBody

//    public Result<String> fanout() {

//     sender.sendFanout("hello,imooc");

//        return Result.success("Helloworld");

//    }

// 

//  @RequestMapping("/mq/topic")

//    @ResponseBody

//    public Result<String> topic() {

//     sender.sendTopic("hello,imooc");

//        return Result.success("Helloworld");

//    }

// 

//  @RequestMapping("/mq")

//    @ResponseBody

//    public Result<String> mq() {

//     sender.send("hello,imooc");

//        return Result.success("Helloworld");

//    }

 

 

问题

启动时候报错没有权限,这个时候不能用guest用户来进行远程登录

 

找到rabbitMQ的 目录,创建配置文件

rabbitmq.config

里面配置上

[{rabbit, [{loopback_users, []}]}].

 

官方文档:http://www.rabbitmq.com/access-control.html

 

 

原文地址:https://www.cnblogs.com/chengxiaolong/p/10206331.html