【RabbitMQ】02 工作队列模式

首先编写一个工作队列的生产者:

发送10条消息然后就关闭,10条消息让RabbitMQ先存着

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class WorkQueueInProducer {

    /**
     * 工作队列
     * @param args
     */
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
        connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
        connectionFactory.setUsername("test"); // guest
        connectionFactory.setPassword("123456"); // guest

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue", true, false, false, null);

        for (int i = 0; i < 10; i++) {
            String body = "send workQueue msg" + i;
            channel.basicPublish("", "work_queue", null, body.getBytes(StandardCharsets.UTF_8));
        }

        channel.close();
        connection.close();
    }
}

 然后创建两个消费者:

两个消费者的代码是一样的,就是接收消息打印即可

但是不要关闭

package cn.dzz.workQueue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class WorkQueueInConsumer2 {

    /**
     * 工作队列 消费者
     * @param args
     */
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
        connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
        connectionFactory.setUsername("test"); // guest
        connectionFactory.setPassword("123456"); // guest

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue", true, false, false, null);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body(message) " + new String(body, StandardCharsets.UTF_8));
                System.out.println("- - - - - over - - - - -");
            }
        };

        channel.basicConsume("work_queue", true, consumer);
    }
}

先打开两个消费者:

 

然后再执行生产者发送消息:

 

RabbitMQ立即把生产者的消息分配过来给两个消费者:

消费者1输出

"C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=50684:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
epositorycom
abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.workQueue.WorkQueueInConsumer1
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
body(message) send workQueue msg0
- - - - - over - - - - -
body(message) send workQueue msg2
- - - - - over - - - - -
body(message) send workQueue msg4
- - - - - over - - - - -
body(message) send workQueue msg6
- - - - - over - - - - -
body(message) send workQueue msg8
- - - - - over - - - - -

消费者2输出:

"C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=50693:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
epositorycom
abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.workQueue.WorkQueueInConsumer2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
body(message) send workQueue msg1
- - - - - over - - - - -
body(message) send workQueue msg3
- - - - - over - - - - -
body(message) send workQueue msg5
- - - - - over - - - - -
body(message) send workQueue msg7
- - - - - over - - - - -
body(message) send workQueue msg9
- - - - - over - - - - -

主要的作用是为了分摊这个10个消息

消费者之间是处于竞争关系,争夺消息的接收

工作队列用于任务过重的场景,用来提高任务处理速度

原文地址:https://www.cnblogs.com/mindzone/p/15371951.html