RabbitMQ的使用三_Java Client方式使用路由模式

RabbitMQ的使用三_Java Client方式使用路由模式

官方文档地址:https://www.rabbitmq.com/tutorials/tutorial-four-java.html

路由模式:生产者将消息发送到了 type 为 direct 模式的交换机,消费者的队列在将自己绑定到路由的时候会给自己设置一个绑定key,只有生产者发送对应 key 格式的消息, 队列才会收到消息。

前面的文章中,队列和交换机的之间的绑定关系,我们使用如下代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

这里的第三个参数,称之为routing_key.也就是绑定键

绑定键的含义取决于交换机的类型。我们以前使用的Fanout交换机时,忽略了它的价值。

Direct exchange交换机

官网提供的配图

解释:

在上图中,我们可以看到direct交换机X与两个队列绑定在一起。第一个队列绑定键为error,第二个队列有三个绑定,一个绑定键为info,第二个绑定键为error,第三个绑定键为warning。

绑定键为error的消息会发布到队列一和队列二。绑定键为info、warning的消息将会被路由到队列二。其他没有匹配到的消息会被丢弃。队列一和队列二有相同的绑定键error,在这种情况下,direct exchange的行为类似于fanout交换机

direct交换机的声明

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

代码案例

创建一个生产者:生产者发送三条消息,一条绑定key为error的消息,一条绑定key为info的数据,一条绑定key为trace的数据

创建2个消费者:消费者1把direct交换机和队列1绑定。绑定key为error

        消费者2把direct交换机和队列2绑定,绑定key为info,error,warning

预期结果:生产者发送绑定键error的数据,会被队列一和队列二同时接收

       生产者发送绑定键为info的数据,只会被队列二接收

     生产者发送绑定键为trace的数据,交换机找不到匹配队列,就会丢弃。队列一和队列二都接收不到消息

生产者

public class DirectExchangeSender {

    // 1.声明一个交换机
    private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        Channel channel = rabbitMQConnections.createChannel();
        channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct");
        StringBuilder errorMsg = new StringBuilder("小河流水哗啦啦");
        StringBuilder infoMsg = new StringBuilder("天街小雨润如酥");
        StringBuilder traceMsg = new StringBuilder("草色要看近却无");
        channel.basicPublish(DIRECT_EXCHANGE_NAME, "error", null, errorMsg.append("_error").toString().getBytes());
        channel.basicPublish(DIRECT_EXCHANGE_NAME, "info", null, infoMsg.append("_info").toString().getBytes());
        channel.basicPublish(DIRECT_EXCHANGE_NAME, "trace", null, traceMsg.append("_trace").toString().getBytes());
        channel.close();
        rabbitMQConnections.close();
        System.out.println("消息发送完成");
    }
}
View Code

消费者1

public class DirectExchangeReceive {
    private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
    private static final String QUEUE_NAME_NUM_ONE = "queue_name_num_one";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        Channel channel = rabbitMQConnections.createChannel();
        // 1.声明一个direct交换机
        channel.exchangeDeclare(DIRECT_EXCHANGE_NAME, "direct");
        // 2.声明队列
        channel.queueDeclare(QUEUE_NAME_NUM_ONE, false, false, false, null);
        // 3绑定队列和交换机的名字
        channel.queueBind(QUEUE_NAME_NUM_ONE, DIRECT_EXCHANGE_NAME, "error");

        // 消费回调
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String receiverMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(new Date() + "一号队列接收到的消息=======>" + receiverMessage);
                // 手动提供一个应答
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME_NUM_ONE, false, deliverCallback, consumerTag -> {
        });
    }
}
View Code

消费者2

public class DirectExchangeReceive2 {
    private static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name";
    private static final String QUEUE_NAME_NUM_TWO = "queue_name_num_two";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection rabbitMQConnections = RabbitMQConnectionFactory.getRabbitMQConnections();
        Channel channel = rabbitMQConnections.createChannel();
        // 1.声明一个direct交换机
        channel.exchangeDeclare(DIRECT_EXCHANGE_NAME,"direct");
        // 2.声明队列
        channel.queueDeclare(QUEUE_NAME_NUM_TWO,false,false,false,null);
        // 3绑定队列和交换机的名字
        channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME_NUM_TWO,DIRECT_EXCHANGE_NAME,"warn");

        // 消费回调
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String receiverMessage = new String(delivery.getBody(),"UTF-8");
                System.out.println(new Date()+"二号队列接收到的消息=======>"+receiverMessage);
                // 手动提供一个应答
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        channel.basicConsume(QUEUE_NAME_NUM_TWO,false,deliverCallback,consumerTag -> {});
    }
}
View Code

执行结果截图

                        图一:消费者1号                                          图二:消费者2号

根据结果可知,和估计的结果一样。

原文地址:https://www.cnblogs.com/yingxiaocao/p/13303504.html