RabbitMQ消息中间件(第二章)第三部分-笔记-Exchange 交换机

Exchange 交换机

  • Exchange: 接收消息,并根据路由键转发消息到所绑定的队列

  交换机属性

  • Name: 交换机名称
  • Type: 交换机类型 direct、topic、fanout、headers
  • Durability: 是否需要持久化,true为持久化
  • Auto Delete: 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
  • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为false --一般设置为false,除非你需要使用erlang语言扩展插件就可能需要将值设置为true来使用
  • Arguments: 扩展参数,用于扩展AMQP协议自制定化使用

  Durability Exchange (直连交换机)

  • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

   注:Direct模式可以使用RabbitMQ自带的Exchange: defalut Exchange,所有不需要将Exchange进行任何绑定(binging)操作,消息传递时,RoueKey必须

     完全匹配才会被队列接收,否则该消息会被抛弃。

  这里说明routingkey和queues里的队列名要相一致

  以下为直连交换机的代码

package com.cx.temp.common.rabbitmq.direct;

import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消费端(直连)
 */
public class Consumer4DirectExchange {

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        connectionFactory.setAutomaticRecoveryEnabled(true); //是否支持自动重连
        connectionFactory.setNetworkRecoveryInterval(3000); //每3秒自动重连一次
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称,是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while (true) {
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }


    }

}
package com.cx.temp.common.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 生产端(直连)
 */
public class Producter4DirectExchange {

    public static void main(String[] args) throws Exception {

        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        //5 发送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    }

}

 Topic Exchange (主题交换机)

  • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

  注:可以使用通配符进行模糊匹配

符号 “#” 匹配一个或者多个词

符号 “*” 匹配不多不少一个词

例如:“log.#” 能匹配到 “log.info.oa”

   “log.*” 只会匹配到 “log.erro”

 

以下为主题交换机的代码

package com.cx.temp.common.rabbitmq.topic;

import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 主题交换机-消费端
 */
public class Consumer4TopicExchange {

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        connectionFactory.setAutomaticRecoveryEnabled(true); //是否支持自动重连
        connectionFactory.setNetworkRecoveryInterval(3000); //每3秒自动重连一次
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
//        String routingKey = "user.#"; //user.save  user.update   user.delete.abc 可以接收到三个routingKey
        String routingKey = "user.*"; // user.save  user.update 可以接收到二个routingKey

        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称,是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while (true) {
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }

    }

}
package com.cx.temp.common.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 主题交换机-生产端
 */
public class Procuder4TopicExchange {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送
        String msg = "Hello World RabbitMQ 4  Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());

        channel.close();
        connection.close();
    }


}
String routingKey = "user.#"; //user.save  user.update   user.delete.abc 可以接收到三个routingKey,执行结果

String routingKey = "user.*"; // user.save  user.update 可以接收到二个routingKey

 

注:这时候还收到三条消息,原因是之前使用了user.#,还在应用中,需要将其解绑

 解绑后在重新执行生产者

 Fanout Exchange (广播交换机)

  • 不处理路由键,只需要简单的将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上、
  • Fanout交换机转发消息是最快的

 

以下为广播交换机的代码

package com.cx.temp.common.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 广播交换机-生产端
 */
public class Procuder4FanoutExchange {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for (int i = 0; i < 10; i++) {
            String msg = "Hello World RabbitMQ 4  Fanout Exchange Message ...";
            channel.basicPublish(exchangeName, "", null, msg.getBytes()); //这里配置成"",也可以配置成其他的,Fanout不会走路由键,所以配置了也无效
        }

        channel.close();
        connection.close();
    }


}
package com.cx.temp.common.rabbitmq.fanout;

import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 广播交换机-消费端
 */
public class Consumer4FanoutExchange {

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test001");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");

        connectionFactory.setAutomaticRecoveryEnabled(true); //是否支持自动重连
        connectionFactory.setNetworkRecoveryInterval(3000); //每3秒自动重连一次
        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        //声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = ""; // 不设置路由键

        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routingKey);

        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称,是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while (true) {
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }

    }

}

执行结果

此外还有以headers为路由的交换器,用途较少,这里就不介绍了



原文地址:https://www.cnblogs.com/huihui-hui/p/14261607.html