6、RabbitMQ-路由模式

Exchange(交换机 转换器)

Exchange分发消息时根据类型的不同分发策略有区别,
目前共四种类型:direct、fanout、topic、headers 。
 
一方面是接受生产者的消息,一方面是向队列推送消息
 
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException
queueBind(String queue, String exchange, String routingKey) throws IOException
“”匿名转发
 
fanout:不处理路由键
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,
只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有
队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消
息是最快的。

direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,
 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,
如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key
 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。
它是完全匹配、单播的模式

 

topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑
定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识
别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词

 

 路由模式

 http://www.rabbitmq.com/tutorials/tutorial-four-java.html

 1、模型

 

2、代码实践

 生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.util.ConnectionUtils;
public class Send {
     
     private static final String EXCHANGE_NAME =  "exchange_routing_direct";
     
     public static void main(String[] args) throws IOException,  TimeoutException {
           
           Connection conn = ConnectionUtils.getConnection();
           Channel channel = conn.createChannel();
           
           //exchange
           channel.exchangeDeclare(EXCHANGE_NAME, "direct");
           
           String msg = "hello direct";
           
           //绑定路由
           String routingKey = "error";
           channel.basicPublish(EXCHANGE_NAME, routingKey, null,  msg.getBytes());
           
           channel.close();
           conn.close();
     }
}

消费者1:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;

public class Receive {
    
    private static final String QUEUE_NAME="test_route";
    private static final String EXCHANGE_NAME = "exchange_routing_direct";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection conn = ConnectionUtils.getConnection();
        
        Channel channel = conn.createChannel();
        
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.basicQos(1);
        
        //绑定队列到交换机转发器
        String routingKey = "error";
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);

                //定义一个消费者
                Consumer consumer = new DefaultConsumer(channel){
                    //收到消息就会触发这个方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                            throws IOException {
                        String msg = new String(body,"utf-8");
                        System.out.println("消费者1接收到的消息" + msg);
                        
                        try {
                            Thread.sleep(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally{
                            System.out.println("消费者1处理完成!");
                            //手动回执
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //监听队列
                //自动应答false
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者2:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.util.ConnectionUtils;

public class Receive2 {
    
    private static final String QUEUE_NAME="test_queue";
    private static final String EXCHANGE_NAME = "exchange_routing_direct";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection conn = ConnectionUtils.getConnection();
        
        Channel channel = conn.createChannel();
        
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        channel.basicQos(1);
        
        //绑定队列到交换机转发器
        //可以同时绑定多个
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        
                //定义一个消费者
                Consumer consumer = new DefaultConsumer(channel){
                    //收到消息就会触发这个方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                            throws IOException {
                        String msg = new String(body,"utf-8");
                        System.out.println("消费者2接收到的消息" + msg);
                        
                        try {
                            Thread.sleep(1500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }finally{
                            System.out.println("消费者2处理完成!");
                            //手动回执
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //监听队列
                //自动应答false
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
此时的测试只有在error的情况下两者均可收到信息
在其他的模式下只有消费者2可以获取消息

 

原文地址:https://www.cnblogs.com/Mrchengs/p/10531318.html