Work Queues(工作队列)

1.模型

2.创建生产者

package com.dwz.rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**           |--c1
 * p---Queue--|
 *            |--c2
 */
public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        for(int i = 0; i < 50; i++) {
            String msg = "send:--" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        
        channel.close();
        connection.close();
    }
}

3.创建消费者1

package com.dwz.rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class rev01 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //自动接收消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("rev01:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

4.创建消费者2

package com.dwz.rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class rev02 {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();
        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //自动接收消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("rev02:" + msg);
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

5.运行代码

预期结果:

按照延迟加载时间获取消息数量不同,数量比例为 延时1:延时2

测试结果如下:

1.两个消费者先启动完成,再启动生产者,这时会采用轮询分发的方式,消费者1和消费者2各拿到一半的消息

2.生产者先启动完成,消费者按照先后顺序启动,会发现所有消息都被先启动的那个消费者接收

达到预期结果的解决方案:

消费者限流+手动签收确认

消费者限流:channel.basicQos(1);

手动签收确认:channel.basicAck(envelope.getDeliveryTag(), false);

原文地址:https://www.cnblogs.com/zheaven/p/11799675.html