RabbitMQ_4、路由模式

路由模式

官方模式

有选择地接收消息

路由去绑定队列

路由队列 生产者

/**
 * @PackageName : com.rzk
 * @FileName : Send
 * @Description : 路由队列-消息生产者
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:21
 * @Version : 1.0.0
 */
public class Send {

    //定义交换机名称
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);

        try (
                //连接工厂创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
                //绑定交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                //
                String infoMessage = " 普通消息  ";
                String errorMessage = " 错误消息  ";
                String warningMessage = " 警告消息  ";
                //需要准备对应的路由
                String infoRoutingKey = "info";
                String errorRoutingKey = "error";
                String warningRoutingKey = "warning";
                //队列消息的生产者:发送消息
                channel.basicPublish(EXCHANGE_NAME, infoRoutingKey, null, infoMessage.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME, errorRoutingKey, null, errorMessage.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME, warningRoutingKey, null, warningMessage.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + infoMessage + "'");
                System.out.println(" [x] Sent '" + errorMessage + "'");
                System.out.println(" [x] Sent '" + warningMessage + "'");
            }
    }
}

路由队列 接收者

direct1

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 路由队列-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv01 {
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //连接工厂创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //获取队列(排他队列
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        String errorRoutingKey = "error";
        channel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        /**
         * 监听队列消费消息
         * autoAck:自动应答
         * 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
         */
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    }
}

direct2

/**
 * @PackageName : com.rzk.simple.recv
 * @FileName : Recv
 * @Description : 路由队列-消息接收
 * @Author : rzk
 * @CreateTime : 23/6/2021 上午12:22
 * @Version : 1.0.0
 */
public class Recv02 {
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] argv) throws Exception {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("");
        factory.setUsername("yeb");
        factory.setVirtualHost("/yeb");
        factory.setPassword("yeb");
        factory.setPort(5672);
        //连接工厂创建连接
        Connection connection = factory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //获取队列(排他队列)
        String queueName = channel.queueDeclare().getQueue();
        //队列绑定交换机
        String infoRoutingKey = "info";
        String errorRoutingKey = "error";
        String warningRoutingKey = "warning";
        channel.queueBind(queueName,EXCHANGE_NAME,infoRoutingKey);
        channel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);
        channel.queueBind(queueName,EXCHANGE_NAME,warningRoutingKey);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //监听队列消费消息
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
    }
}

路由缺点和优点

优点: 他根据路由key去绑定对应交换机绑定队列,能够更加明确去指定要把哪些消息发送给不同种类的人(定制性更高,指定谁就给谁发)
缺点: 当项目逐日壮大的时候,路由key会很多(成千上万个),那么需要去一个一个指定吗,工作量会很大(需要用到主题模式去优化)

原文地址:https://www.cnblogs.com/rzkwz/p/14928677.html