RabbitDemo —— Headers

Producer:

public class Producer {
    private final static String EXCHANGE_NAME = "header-exchange";  
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = Common.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
        String str = new Date().toString()+": log something";
        
        Map<String, Object> headers = new Hashtable<String,Object>();
        headers.put("aaa", "value");
        Builder properties = new BasicProperties().builder();
        properties.headers(headers);
        
        channel.basicPublish(EXCHANGE_NAME, "", properties.build(), str.getBytes());
        System.out.println("send msg:"+str);
        
        channel.close();
        connection.close();
    }
}
View Code

Consumers:

public class Consumers {
    private final static String EXCHANGE_NAME = "header-exchange";  
    private final static String QUEUE_NAME = "header-queue";  
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = Common.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
        channel.queueDeclare(QUEUE_NAME, false, false, true, null);
        Map<String,Object> headers = new Hashtable<String,Object>();
        headers.put("x-match", "any");
        headers.put("aaa", "value");
        headers.put("bbb", "01234");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
        
        Consumer consumer = new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("receive msg:"+new String(body));
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
View Code
原文地址:https://www.cnblogs.com/yifanSJ/p/9022363.html