【RabbitMQ】03 订阅模式

Pub / Sub 订阅模式

特点是 一条消息可以给多个消费者接收了

 

首先创建订阅模式生产者发生一些代码变动:

package cn.dzz.pubSub;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class PubSubInProducer {

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
        connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
        connectionFactory.setUsername("test"); // guest
        connectionFactory.setPassword("123456"); // guest

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        /**
         *  多了一个创建交换机的过程
         *  public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
         *      return this.exchangeDeclare(exchange, type.getType(), durable, autoDelete, internal, arguments);
         *  }
         *  String exchange 交换机名称
         *  String type 交换机类型,这里换成枚举类型,方便查找 com.rabbitmq.client.BuiltinExchangeType
         *      DIRECT("direct"), 定向  简单模式 和 工作模式
         *      FANOUT("fanout"), 扇形  广播(通知给所有和这个交换机绑定的队列)
         *      TOPIC("topic"), 通配符 ?
         *      HEADERS("headers"); 参数匹配, 视频暂不讲解
         *  boolean durable 持久化
         *  boolean autoDelete 自动删除
         *  boolean internal 内部使用 一般false
         *  Map<String, Object> arguments
         *
         *
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(
                exchangeName,
                BuiltinExchangeType.FANOUT,
                true,
                false,
                false,
                null
        );

        // 订阅模式案例需要两个队列支持
        String queueName1 = "pub&sub - 1";
        String queueName2 = "pub&sub - 2";
        channel.queueDeclare(queueName1, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);

        /**
         * 将交换机和队列绑定
         *  public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException {
         *      return this.queueBind(queue, exchange, routingKey, (Map)null);
         *  }
         *  String queue 队列名称
         *  String exchange 交换机名称
         *  String routingKey 路由键,绑定规则
         *      如果是fanout模式, 设置""即可,默认就是给所有队列绑定
         *
         */
        channel.queueBind(queueName1, exchangeName, "");
        channel.queueBind(queueName2, exchangeName, "");

        // 发送消息
        // for (int i = 0; i < 10; i++) {
        //     String body = "send sub&pub msg" + i;
        //     channel.basicPublish("", "work_queue", null, body.getBytes(StandardCharsets.UTF_8));
        // }

        // 释放资源
        // channel.close();
        // connection.close();
    }

}

启动之后,可以发现在面板Exchange上面就能找到我们创建的交换机了

 Queue面板上面有我们的两个订阅队列:

 

之后开始发送消息,注意下面发送消息的参数改动:

再次启动,将消息发送到RabbitMQ中

package cn.dzz.pubSub;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class PubSubInProducer {

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
        connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
        connectionFactory.setUsername("test"); // guest
        connectionFactory.setPassword("123456"); // guest

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        /**
         *  多了一个创建交换机的过程
         *  public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
         *      return this.exchangeDeclare(exchange, type.getType(), durable, autoDelete, internal, arguments);
         *  }
         *  String exchange 交换机名称
         *  String type 交换机类型,这里换成枚举类型,方便查找 com.rabbitmq.client.BuiltinExchangeType
         *      DIRECT("direct"), 定向  简单模式 和 工作模式
         *      FANOUT("fanout"), 扇形  广播(通知给所有和这个交换机绑定的队列)
         *      TOPIC("topic"), 通配符 ?
         *      HEADERS("headers"); 参数匹配, 视频暂不讲解
         *  boolean durable 持久化
         *  boolean autoDelete 自动删除
         *  boolean internal 内部使用 一般false
         *  Map<String, Object> arguments
         *
         *
        */
        String exchangeName = "test_fanout";
        channel.exchangeDeclare(
                exchangeName,
                BuiltinExchangeType.FANOUT,
                true,
                false,
                false,
                null
        );

        // 订阅模式案例需要两个队列支持
        String queueName1 = "pub&sub - 1";
        String queueName2 = "pub&sub - 2";
        channel.queueDeclare(queueName1, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);

        /**
         * 将交换机和队列绑定
         *  public com.rabbitmq.client.AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException {
         *      return this.queueBind(queue, exchange, routingKey, (Map)null);
         *  }
         *  String queue 队列名称
         *  String exchange 交换机名称
         *  String routingKey 路由键,绑定规则
         *      如果是fanout模式, 设置""即可,默认就是给所有队列绑定
         *
         */
        channel.queueBind(queueName1, exchangeName, "");
        channel.queueBind(queueName2, exchangeName, "");

        // 发送消息
         for (int i = 0; i < 10; i++) {
             String body = "sending sub&pub msg " + i;
             // 现在反过来, 交换机名字有了, 路由则默认了
             channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));
         }

        // 释放资源
         channel.close();
         connection.close();
    }

}

这样每个队列都有10条待送出的消息

创建两个订阅的消费者:

两个消费者接收都是一样的,注意切换队列名字

package cn.dzz.pubSubQueue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class PubSubQueueInConsumer1 {

    /**
     * 工作队列 消费者
     * @param args
     */
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.2.121");
        connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
        connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
        connectionFactory.setUsername("test"); // guest
        connectionFactory.setPassword("123456"); // guest

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 声明将不在需要
        // channel.queueDeclare("work_queue", true, false, false, null);

        // 从生产者复制过来需要的队列名称
        String queueName1 = "pub&sub - 1";
        String queueName2 = "pub&sub - 2";

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body(message) " + new String(body, StandardCharsets.UTF_8));
                System.out.println("- - - - - over - - - - -");
            }
        };

        channel.basicConsume(queueName1, true, consumer);
    }
}

订阅的消费者1和2都能收到同样的消息:

"C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=58116:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
epositorycom
abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.pubSubQueue.PubSubQueueInConsumer2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
body(message) sending sub&pub msg 0
- - - - - over - - - - -
body(message) sending sub&pub msg 1
- - - - - over - - - - -
body(message) sending sub&pub msg 2
- - - - - over - - - - -
body(message) sending sub&pub msg 3
- - - - - over - - - - -
body(message) sending sub&pub msg 4
- - - - - over - - - - -
body(message) sending sub&pub msg 5
- - - - - over - - - - -
body(message) sending sub&pub msg 6
- - - - - over - - - - -
body(message) sending sub&pub msg 7
- - - - - over - - - - -
body(message) sending sub&pub msg 8
- - - - - over - - - - -
body(message) sending sub&pub msg 9
- - - - - over - - - - -
原文地址:https://www.cnblogs.com/mindzone/p/15373986.html