SpringBoot与RabbitMQ进行整合

准备:

1.RabbitMQ安装(我是在window环境下安装的)。

安装完成之后进入登录页面配置,默认地址:http://localhost:15672

2.创建一个SpringBoot项目。

配置文件:

  #rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: none
        concurrency: 1
        max-concurrency: 1
        retry:
          enabled: false

java代码

RabbitMQ配置:

/**
 * 消息列队配置
 * 
 * @author My
 */
@Component
public class RabbitmqConfig {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 消息交换机
    public final static String DIRECT_EXCHANGE = "directExchange";
    // 日志收集
    public final static String ROUTING_KEY_LOGGER = "directLoggerQueue";
}

配置交换机

@Bean
public DirectExchange directExchange() {
    // return new DirectExchange(DIRECT_EXCHANGE, true, false);
    logger.debug("RabbitMQ交换机初始化");
    return (DirectExchange) ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
}

配置logger消息列队

@Bean
public Queue directLoggerQueue() {
   return QueueBuilder.durable(ROUTING_KEY_LOGGER).build();
}

将消息列队和交换机绑定

@Bean
public Binding bindingDirectLoggerQueue(Queue directLoggerQueue, DirectExchange directExchange) {
    return BindingBuilder.bind(directLoggerQueue).to(directExchange).with(ROUTING_KEY_LOGGER);
}

生成者:

/**
 * 日志收集
 * 生产者
 * @author My
 */
@Component
public class LoggerProducer {

    private Logger logger = LoggerFactory.getLogger(LoggerProducer.class);
    
    @Autowired private AmqpTemplate amqpTemplate;
    
    /**
     * 添加消息
     * @param object
     */
    public void send(Object object) {
        String jsonStr = JSONObject.toJSONString(object);
        amqpTemplate.convertAndSend(RabbitmqConfig.DIRECT_EXCHANGE, RabbitmqConfig.ROUTING_KEY_LOGGER, jsonStr);
        logger.info("日志消息已经发送++++++++++++++++++");
    }
}

我这里传输的时候是对数据进行了json格式化,也可以根据自己的业务进行修改。

RabbitmqConfig.DIRECT_EXCHANGE 交换机的名字
RabbitmqConfig.ROUTING_KEY_LOGGER Queue列队的名字
 

消费者:

/**
 * 日志收集处理
 * 消费者
 * @author My
 */
@Component
public class LoggerConsumer {
    private Logger logger = LoggerFactory.getLogger(LoggerConsumer.class);

    @Autowired private ILogService iLogService;
    
    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.ROUTING_KEY_LOGGER)
    public void process(String json, Message amqpMessage, Channel channel) throws Exception {
        logger.debug("logger接收到消息:{}", json);
        LogModel log = JSONObject.parseObject(json, LogModel.class);
        iLogService.insert(log);

    }
}

这里@RabbitListener注解中的queues需要和生产者中的名字一致。

调用的地方注入之后使用就可以了。

/** 日志收集通道 */
@Autowired
private LoggerProducer loggerProducer;

整合完成。

原文地址:https://www.cnblogs.com/se7end/p/9522555.html