RabbitMQ发布/订阅模式

1、生产者

package com.ys.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.ys.utils.ConnectionUtil;

/**
* Create by YSOcean
*/
public class Producer {
private final static String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws Exception {
//1、获取连接
Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
//2、声明信道
Channel channel = connection.createChannel();
//3、声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//4、创建消息
String message = "hello rabbitmq";
//5、发布消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
//6、关闭通道
channel.close();
//7、关闭连接
connection.close();
}
}

2、消费者

  消费者1:

package com.ys.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
* Create by YSOcean
*/
public class Consumer1 {

private final static String QUEUE_NAME = "fanout_queue_1";

private final static String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws Exception{
//1、获取连接
Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
//5、定义队列的消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6、监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
//6、获取消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 消费者1:" + message + "'");
//消费者1接收一条消息后休眠10毫秒
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}

}

消费者:2

package com.ys.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.ys.utils.ConnectionUtil;


/**
* Create by YSOcean
*/
public class Consumer2 {

private final static String QUEUE_NAME = "fanout_queue_2";

private final static String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws Exception{
//1、获取连接
Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
//2、声明通道
Channel channel = connection.createChannel();
//3、声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
//5、定义队列的消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6、监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
//6、获取消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 消费者2:" + message + "'");
//消费者2接收一条消息后休眠10毫秒
Thread.sleep(1000);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

消费者1和消费者2都监听了被同一个交换器绑定的队列。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。

原文地址:https://www.cnblogs.com/yinzhou/p/11213639.html