RabbitMQ入门-Topic模式

发送到topic的消息不能有任意的绑定键,绑定键的规则:必须由(.)分割的单词列表。比如apple.banana.orange

绑定键也有两个特殊字符:

 * 可以代替一个单词。
# 可以替代零个或多个单词。

比如:apple.#  *.banana.*

生产者:

package com.example.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 有选择的接受消息
 */
public class TopicSend {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 获取连接
        Channel channel = connection.createChannel();

        // topic类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String[] msg = {"666的兔子","懒惰的大象"};
        // 第二个参数为绑定键
        channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg[0].getBytes());
        channel.basicPublish(EXCHANGE_NAME, "lazy.write.elephant", null, msg[1].getBytes());
        System.out.println("PS-Send:" + msg.toString());

        channel.close();
        connection.close();

    }
}

消费者:

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicReceive {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
        factory.setHost("localhost");
        Connection connection = factory.newConnection();        // 获取连接
        Channel channel = connection.createChannel();

        // 声明一个topic交换类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称
        String queueA = channel.queueDeclare().getQueue();
        String queueB = channel.queueDeclare().getQueue();
        System.out.println("临时队列:" + queueA);
        System.out.println("临时队列:" + queueB);

        // 第三个参数为“绑定建”
        // * 可以代替一个单词。
        // # 可以替代零个或多个单词。
        channel.queueBind(queueA, EXCHANGE_NAME, "*.orange.*");
        channel.queueBind(queueB, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueB, EXCHANGE_NAME, "lazy.#");

        Consumer consumerA = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recv = new String(body, "UTF-8");
                System.out.println("Direct-Receive-A:" + recv);
            }
        };
        Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String recv = new String(body, "UTF-8");
                System.out.println("Direct-Receive-B:" + recv);
            }
        };
        channel.basicConsume(queueA, true, consumerA);
        channel.basicConsume(queueB, true, consumerB);
    }
}

先启动消费者:再启动生产者:控制台

..

 第一条消息,匹配A和B

第二条消息,只匹配B

原文地址:https://www.cnblogs.com/LUA123/p/8477387.html