10.RabbitMQ Fanout类型交换机

Fanout类型交换机忽略Routing Key,它将消息传递到所有与它绑定的队列上。

 
10.RabbitMQ <wbr>Fanout类型交换机
10.RabbitMQ <wbr>Fanout类型交换机
 
10.RabbitMQ <wbr>Fanout类型交换机
10.RabbitMQ <wbr>Fanout类型交换机
10.RabbitMQ <wbr>Fanout类型交换机
 
Producer.java
package com.test.fanout;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
 
public class Producer {
 
    public static void main(String[] args) throws Exception {
      //使用默认端口连接MQ
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.142"); //使用默认端口5672
        Connection conn = factory.newConnection(); //声明一个连接
        Channel channel = conn.createChannel(); //声明消息通道
   
String message = "hello world!";
String queueName1 = "queue_fanout1";
String queueName2 = "queue_fanout2";
String queueName3 = "queue_fanout3";
String exchangeName = "test.fanout";
//Routing Key
channel.queueDeclare(queueName1, false, false, false, null);
channel.queueDeclare(queueName2, false, false, false, null);
channel.queueDeclare(queueName3, false, false, false, null);
channel.exchangeDeclare(exchangeName, "fanout", false, false, null);
 
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
channel.queueBind(queueName3, exchangeName, "");
 
channel.basicPublish(exchangeName, "",
MessageProperties.TEXT_PLAIN, message.getBytes());
 
System.out.println("Message "" + message + "" sent successfully.");
 
channel.close();
conn.close();
    }
 
}
 
Customer.java
package com.test.fanout;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
//通过channel.basicAck向服务器发送回执,删除服务上的消息
public class Consumer implements com.rabbitmq.client.Consumer{
private Channel channel;
 
    public static void main(String[] args) throws Exception {
      //使用默认端口连接MQ
        ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("admin");
    factory.setPassword("admin");
        factory.setHost("192.168.169.142"); //使用默认端口5672
        Connection conn = factory.newConnection(); //声明一个连接
        Channel channel = conn.createChannel(); //声明消息通道
 
String queueName = args[0];//"queue_fanout1";
 
channel.queueDeclare(queueName, false, false, false, null);
 
Consumer consumer = new Consumer();
consumer.channel = channel;
 
channel.basicConsume(queueName, false, consumer);
    }
 
@Override
public void handleConsumeOk(String consumerTag) {
// TODO Auto-generated method stub
System.out.println("Consumer "" + consumerTag + "" has subscribed.");
}
 
@Override
public void handleCancelOk(String consumerTag) {
// TODO Auto-generated method stub
}
 
@Override
public void handleCancel(String consumerTag) throws IOException {
// TODO Auto-generated method stub
}
 
@Override
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
// TODO Auto-generated method stub
System.out.println("Message "" + new String(body) + "" received.");
channel.basicAck(env.getDeliveryTag(), false);
}
 
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// TODO Auto-generated method stub
}
 
@Override
public void handleRecoverOk(String consumerTag) {
// TODO Auto-generated method stub
}
}
 
 
原文地址:https://www.cnblogs.com/zzpblogs/p/8168817.html