RabbitDemo —— Topic

SendLogTopic:

/**
 * 客户端代码关注交换器
 * 消费端代码关注交换器、队列及其绑定关系
 */
public class SendLogTopic {
    private static final String EXCHANGE_NAME="topic_logs";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = Common.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", false, false, null);
        String[] routing_keys = new String[] {"kernal.info", "cron.warning", "auth.info", "kernal.critical"};
        for(String routing_key : routing_keys) {
            String msg = UUID.randomUUID().toString();
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg.getBytes());
            System.out.println("send:"+routing_key+"----"+msg);
        }
        channel.close();
        connection.close();
    }
}
View Code

ReceiveLogsTopicForKernel:

/**
 * 客户端代码关注交换器
 * 消费端代码关注交换器、队列及其绑定关系
 */
public class ReceiveLogsTopicForKernel {
    private static final String EXCHANGE_NAME="topic_logs";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = Common.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",false,false,null);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "kernal.*");
        
        System.out .println(" [*] Waiting for critical messages. To exit press CTRL+C");   
        
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("recv msg:"+envelope.getRoutingKey()+"------"+new String(body));
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        
        channel.basicConsume(queueName, false, consumer);
        
//        channel.close();
//        connection.close();
    }
}
View Code

ReceiveLogsTopicForCritical:

/**
 * 客户端代码关注交换器
 * 消费端代码关注交换器、队列及其绑定关系
 */
public class ReceiveLogsTopicForCritical {
    private static final String EXCHANGE_NAME="topic_logs";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = Common.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", false, false, null);
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
        
        System.out .println(" [*] Waiting for critical messages. To exit press CTRL+C");   
        
        Consumer consumer = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("recv msg is:"+envelope.getRoutingKey()+"------"+new String(body));
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(queueName, false, consumer);
    }
}
View Code
原文地址:https://www.cnblogs.com/yifanSJ/p/9022350.html