MQ的使用

笔记来自
基础使用
延迟队列
确认机制

全过程

  • 连接mq服务器
  • 生成template对象
  • 我们的角色是admin
  • 设置好几个队列queue
  • 交换机exchange,有4种交换机
  • 设置处理队列的方法,也叫做消费者,这是自动处理的
  • 通过接口往队列添加内容

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties

#rabbitmq
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=/
#消费者数量
spring.rabbitmq.listener.simple.concurrency=10
#最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费者消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试 队列满了发不进去时启动重试
spring.rabbitmq.template.retry.enabled=true 
#1秒钟后重试一次
spring.rabbitmq.template.retry.initial-interval=1000 
#最大重试次数 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大间隔 10秒钟
spring.rabbitmq.template.retry.max-interval=10000
#等待间隔 的倍数。如果为2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0

交换机模式

  • direct,全匹配的点对点
@Configuration
public class SenderConf {
    @Bean
    public Queue queue() {
        return new Queue("queue");
    }
}


@Service
public class HelloSender {
    @Autowired
    private AmqpTemplate template;
 
    public void send() {
        template.convertAndSend("queue", "hello,rabbit666~");
    }
}

@Component
public class MyListner 
    @RabbitListener(queues = "queue")
    public void msg(String msg){
        System.out.println("消费者消费消息了:"+msg);
    }
}
  • topic,模糊匹配,需要判断
@Configuration
public class SenderConf1 {
 
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }
 
    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
 
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }
 
    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
    }
}


@Service
public class HelloSender {
    @Autowired
    private AmqpTemplate template;
 
    public void send() {
        template.convertAndSend("exchange","topic.message","hello,rabbit~~~11");
        template.convertAndSend("exchange","topic.messages","hello,rabbit~~~22");
    }
}

@Component
public class MyListner{
    @RabbitListener(queues="topic.message")    //监听器监听指定的Queue
    public void process1(String str) {
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="topic.messages")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }
}
  • faout,全广播
@Configuration
public class SenderConf2 {
 
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }
 
    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }
 
    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
 
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }
 
    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

@Service
public class HelloSender {
    @Autowired
    private AmqpTemplate template;
 
    public void send() {
        template.convertAndSend("fanoutExchange","","xixi,haha");//参数2忽略
    }
}

@Component
public class MyListner{
    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }
}

过期时间

  • 设置了过期时间,过期了就没了,有两种方式
  • 如果两个同时设置已最早过期时间为准
// 在发送消息时设置过期时间
@Test
public void ttlMessageTest(){
   MessageProperties messageProperties = new MessageProperties();
   //设置消息的过期时间,3秒
   messageProperties.setExpiration("3000");
   Message message = new Message("测试过期消息,3秒钟过期".getBytes(), messageProperties);
   //路由键与队列同名
   rabbitTemplate.convertAndSend("my_ttl_queue", message);
}

// 设置整个队列的过期时间
@Configuration
public class SenderConf3 {
    // 新建业务队列,添加死信配置,
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        Map<String, Object> args = new HashMap<>(1);
        // 过期时间
        args.put("x-message-ttl", 10*1000);
        return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
    }
    // 需要普通业务交换机和绑定,这里省略
}

死信队列

  • 上面过期了没了,可以让他们去到死信的队列
@Configuration
public class SenderConf3 {
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("DL_QUEUE").build();
    }

    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
    }

    // 新建业务队列,添加死信配置,
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put("x-dead-letter-exchange", "DL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put("x-dead-letter-routing-key", "DL_KEY");
        return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
    }

    // 需要普通业务交换机和绑定,这里省略
}

延迟队列

  • 需要给rabbitmq安装插件,放在pugins文件夹下重启服务
1. 查看yum 安装的软件路径
   查找安装包:rpm -qa|grep rabbitmq
   查找位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
   卸载yum安装:yum remove rabbitmq-server-3.6.15-1.el6.noarch
2. 上传到plugins文件夹
3. 停止服务器
   service rabbitmq-server stop
4. 开启插件
   rabbitmq-plugins enable rabbitmq_delayed_message_exchange
   (关闭插件)
   rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5. 启动服务器
   service rabbitmq-server start
6. 查看插件
   rabbitmq-plugins list
@Configuration
public class DelayQueueConfig {

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
    public static final String DELAY_QUEUE = "DELAY_QUEUE";
    public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";

    @Bean("delayExchange")
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>(1);
//       x-delayed-type    声明 延迟队列Exchange的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
    }

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }
    
    @Bean
    public Binding delayQueueBindExchange() {
        return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
    }

}


@Component
public class DelayQueueSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayQueue(int number) {
        rabbitTemplate.convertAndSend(
                "textExchange",
                "textKey",
                number, (message) -> {
                    // 设置延迟的毫秒数
                    message.getMessageProperties().setDelay(number);
                    log.info("Now : {}", ZonedDateTime.now());
                    return message;
                });
    }
}

// 监听textKey对应的队列等消息就行

确认机制

  • 配置
# 发送确认
spring.rabbitmq.publisher-confirms=true
# 发送回调
spring.rabbitmq.publisher-returns=true
# 消费手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 生产者发送消息确认机制
    • 其实这个也不能叫确认机制,只是起到一个监听的作用,监听生产者是否发送消息到exchange和queue。
    • 生产者和消费者代码不改变。
    • 新建配置类 MQProducerAckConfig.java 实现ConfirmCallback和ReturnCallback接口,@Component注册成组件。
    • ConfirmCallback只确认消息是否到达exchange,已实现方法confirm中ack属性为标准,true到达,反之进入黑洞。
    • ReturnCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功" + correlationData);
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化对象输出
        System.out.println("消息主体: " + SerializationUtils.deserialize(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }
}
  • 如果消费者消息是默认auto
    • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
    • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
    • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
    • 其他的异常,则消息会被拒绝,且 requeue = true,此时会发生死循环,可以通过 setDefaultRequeueRejected(默认是true)去设置抛弃消息
  • 消费者消息手动确认manual,一定要对消息做出应答,否则rabbit认为当前队列没有消费完成,将不再继续向该队列发送消息
@Component
public class MyListner 
    @RabbitListener(queues = "queue")
    public void msg(Channel channel,String msg) throws IOException {
        System.out.println("消费者消费消息了:"+msg);
        // 多了个channel,还要监听错误
        // channel有三个方法,一个是成功,一个是拒绝,一个是重新入队
        try {
            // 模拟执行任务
            Thread.sleep(1000);
            // 模拟异常
            String is = null;
            is.toString();
            // 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("消息已重复处理失败,拒绝再次接收" + user.getName());
                // 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("消息即将再次返回队列处理" + user.getName());
                // requeue为是否重新回到队列,true重新入队
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            //e.printStackTrace();
        }
    }
}
  • 这个就是手动事务了

持久化

  • 交换机持久化
@Bean
public DirectExchange testDirectExchange(){
   //第二个参数就是是否持久化,第三个参数就是是否自动删除
   return new DirectExchange("direct.Exchange",true,false);
}
  • 队列持久化
@Bean
public Queue txQueue(){
   //第二个参数就是durable,是否持久化
   return new Queue("txQueue",true);
}

高级知识

  • 集群
  • HAProxy
  • KeepAlived

面试问题

  • 消息堆积
// 原因
太多入队,消费不及时,队列占满
// 解决方案
增加消费者
  • 消息丢失
// 原因一
消息在生产者丢失
// 解决方案一
信息被MQ接受后需要给生产者发送一个确认消息(确认机制)
在confirm方法里的信息发送失败后面添加重发机制

// 原因二
消息在MQ宕机丢失
// 解决方案二
启动持续化

// 原因三
消息在消费者丢失
// 解决方案二
消费者确认机制,事务机制
  • 有序消费
// 目的
有ABC三个消息,想要顺序执行ABC,但是有多个消费者,ABC会被瞬间平分
// 解决方案
改成多个队列,一个队列一个消费者,信息由hash值放到对应队列
  • 重复消费
// 原因
为了防止消息在消费者丢失开启了手动回复,但是如果在消费者执行成功了,但是回复的时候出了问题,mq就以为消息没成功又给下一个消费者发送一次,同个消息执行多次
// 解决
每个消息都添加id,redis也添加id,消费者接受信息后判断这个信息是不是用过了,用过了直接返回成功
原文地址:https://www.cnblogs.com/pengdt/p/13523324.html