Topics(主题模式)

引言

topic exchange和direct exchange类似,都是通过routing key和binding key进行匹配,不同的是topic exchange可以为routing key设置多重标准。

direct路由器类似于sql语句中的精确查询;topic 路由器有点类似于sql语句中的模糊查询。

topic 使用通配符“*”和“#”进行routingkey的模糊匹配:

*:精确匹配一个; #:任意匹配多个

1.模型

2.创建生产者

package com.dwz.rabbitmq.exchange.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_topic_exchange";
        String routingKey_1 = "user.save";
        String routingKey_2 = "user.update";
        String routingKey_3 = "user.delete.abc";
        
        String msg = "hello rabbitmq topic message successs!--";
        channel.basicPublish(exchangeName, routingKey_1, null, (msg + routingKey_1).getBytes());
        channel.basicPublish(exchangeName, routingKey_2, null, (msg + routingKey_2).getBytes());
        channel.basicPublish(exchangeName, routingKey_3, null, (msg + routingKey_3).getBytes());
        
        channel.close();
        connection.close();
    }
}

3.创建消费者1

package com.dwz.rabbitmq.exchange.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * topic:模糊匹配
 * @author dangwangzhen
 *
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue_1";
        String routingKey = "user.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("rec topic 1--message:" + msg);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

4.创建消费者2

package com.dwz.rabbitmq.exchange.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.dwz.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
 * topic:模糊匹配
 * @author dangwangzhen
 *
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue_2";
        String routingKey = "user.*.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("rec topic2--message:" + msg);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

5.运行代码

success!

原文地址:https://www.cnblogs.com/zheaven/p/11801811.html