RabbitMQ的使用

一、docker下安装

1.下载镜像

docker pull rabbitmq:management

2.运行容器

docker run -di --name=rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management

3.浏览器访问

http://192.168.25.129:15672/#/

默认用户名和密码都是guest

4.修改密码

rabbitmqctl  change_password  guest  '123456'

二、用java代码操作rabbitmq

1.直接模式

1)继承springboot并添加相关依赖

    <parent>
        <artifactId>tenpower-parent</artifactId>
        <groupId>com.tenpower</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2)在application.yml中配置rabbitmq

spring:
  rabbitmq:
    host: 192.168.25.129

3)在main目录下编写springboot启动类

4)在test目录下编写发送消息测试类

@RunWith(SpringRunner.class)    //替代junit原生的运行器
@SpringBootTest(classes = RabbitmqApplication.class)
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsgDirectly() {
        rabbitTemplate.convertAndSend("red","直接模式测试");
    }
}

5)在main目录下写消息消费类

@Component  //加入spring容器
@RabbitListener(queues = "red")
public class Customer1 {
    @RabbitHandler
    public void showMsg(String msg) {   //方法名称随意
        System.out.println("red接收到的消息:" + msg);
    }
}

6)在http://192.168.25.129:15672/#/里创建一个队列,取名为red

7)运行测试方法发现浏览器rabbitmq页面出现消息,运行启动类后消息消失,且控制台打印出消息

2.分列模式(不常用,略)

3.主题模式

1)新建1个Exchange,name为color,类型为topic

2)新建3个queue,name分别为red、blue、green

3.点击新建的交换器color,binding新建的3个队列,并写上routing key

4.分别写3个消费者类

@Component  //加入spring容器
@RabbitListener(queues = "red")
public class Customer1 {
    @RabbitHandler
    public void showMsg(String msg) {   //方法名称随意
        System.out.println("red接收到的消息:" + msg);
    }
}
@Component
@RabbitListener(queues = "blue")
public class Customer2 {
    @RabbitHandler
    public void showMsg(String msg) {
        System.out.println("blue接收到的消息:"+msg);
    }
}
@Component
@RabbitListener(queues = "green")
public class Customer3 {
    @RabbitHandler
    public void showMsg(String msg) {
        System.out.println("green接收到的消息:"+msg);
    }
}

5.以主题模式生产消息

   @Test
    public void sendMsgTopic1() {
        rabbitTemplate.convertAndSend("color", "goods.aaa", "主题模式测试1");
    }
    @Test
    public void sendMsgTopic2() {
        rabbitTemplate.convertAndSend("color", "goods.bbb.log", "主题模式测试2");
    }
    @Test
    public void sendMsgTopic3() {
        rabbitTemplate.convertAndSend("color", "goods.log", "主题模式测试3");
    }

运行测试方法和启动类,发现topic1方法输出red,topic2输出red和blue,topic3输出red、blue、green

三、利用rabbitmq执行延时任务

1.安装插件

1)下载

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

2)复制到rabbitmq插件目录

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

3)使插件生效

cd /sbin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4)重启rabbitmq

docker restart rabbitmq

2.代码使用

1)添加配置类,绑定延时队列(可以绑定多个)

package com.lbh360.order.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    public static final String EXCHANGE_DELAY = "delay.exchange";
    public static final String FINISH_COMPLETE = "finish.complete";


    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_DELAY, "x-delayed-message", true, false, args);
    }
    

    @Bean
    public Queue finishCompleteQueue() {
        return new Queue(FINISH_COMPLETE);
    }
    

    @Bean
    public Binding finishCompleteBinding(@Qualifier("finishCompleteQueue") Queue queue,
                                          @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(FINISH_COMPLETE).noargs();
    }
}

2)生产者

package com.lbh360.order;

import com.lbh360.OrderApp;
import com.lbh360.order.config.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApp.class)
public class OrderTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DELAY, RabbitConfig.FINISH_COMPLETE, "1270618586310995968", message -> {
            message.getMessageProperties().setDelay(3 * 1000);
            return message;
        });
    }
}

3)消费者

package com.lbh360.order.listener;

import com.lbh360.order.config.RabbitConfig;
import com.lbh360.order.model.po.Order;
import com.lbh360.order.model.response.OrderResp;
import com.lbh360.order.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Description rabbitmq延时任务监听类
 * @Author bofeng
 * @Date 2019/4/8 21:50
 * @Version 1.0
 */
@Component
public class AmqDelayListener {
    private static final Logger logger = LoggerFactory.getLogger(AmqDelayListener.class);

    @Resource
    private OrderService orderService;
    

    /**
     * 完成订单后超时结单
     *
     * @param message
     */
    @RabbitListener(queues = RabbitConfig.FINISH_COMPLETE)
    public void completeOrder(Message message) {
        String orderNo = new String(message.getBody());
        logger.info("completeOrder, orderNo:{}", orderNo);

        orderService.completeOrder(orderNo);
    }
}

4)若消费端抛异常消息无法被消费,会导致rabbitmq不停回调。application.ym可l配置最大重试次数和间隔时间

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000ms
原文地址:https://www.cnblogs.com/naixin007/p/10654536.html