MQ的demo

public class WorkTest {
 @Test
 public void send() throws Exception{
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明队列
   chan.queueDeclare("work", false, false, false, null);
   for(int i=0;i<100;i++){
     String msg="1712,hello:"+i+"message";
     chan.basicPublish("", "work", null, msg.getBytes());
     System.out.println("第"+i+"条信息已经发送");
   }
   chan.close();
   conn.close();
 }
 @Test
 public void receive1() throws Exception{
   //获取连接,获取信道
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   chan.queueDeclare("work", false, false, false, null);
   //同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //绑定队列和消费者的关系
   //queue
   //autoAck:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
   //完成消息消费后进行回执确认,channel.ack,channel.nack
   //callback
   //chan.basicConsume(queue, autoAck, callback)
   chan.basicConsume("work", false, consumer);
   //监听
   while(true){
     Delivery delivery=consumer.nextDelivery();
     byte[] result = delivery.getBody();
     String msg=new String(result);
     System.out.println("接受到:"+msg);
     Thread.sleep(50);
     //返回服务器,回执
     chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }  
 }
 @Test
 public void receive2() throws Exception{
   //获取连接,获取信道
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   chan.queueDeclare("work", false, false, false, null);
   //同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //绑定队列和消费者的关系
   //queue
   //autoAck:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
   //完成消息消费后进行回执确认,channel.ack,channel.nack
   //callback
   //chan.basicConsume(queue, autoAck, callback)
   chan.basicConsume("work", false, consumer);
   //监听
   while(true){
     Delivery delivery=consumer.nextDelivery();
     byte[] result = delivery.getBody();
     String msg=new String(result);
     System.out.println("接受到:"+msg);
     Thread.sleep(150);
     //返回服务器,回执
     chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
 }
 
}

  

 publish/fanout发布订阅

public class FanoutTest {
 //交换机,有类型,发布订阅:fanout
 //路由模式:direct
 //主题模式:topic
 @Test
 public void send() throws Exception {
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明交换机
   //参数意义,1 交换机名称,2 类型:fanout,direct,topic
   chan.exchangeDeclare("fanoutEx", "fanout");
   //发送消息
   for(int i=0;i<100;i++){
     String msg="1712 hello:"+i+"msg";
     chan.basicPublish("fanoutEx", "", null, msg.getBytes());
     System.out.println("第"+i+"条信息已经发送");
   }
 }
 
 @Test
 public void receiv01() throws Exception{
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //生命队列
   chan.queueDeclare("fanout01", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("fanoutEx", "fanout");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("fanout01", "fanoutEx", "");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("fanout01",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("一号消费者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 @Test
 public void receiv02() throws Exception{
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //生命队列
   chan.queueDeclare("fanout02", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("fanoutEx", "fanout");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("fanout02", "fanoutEx", "");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("fanout02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二号消费者接收到"+new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
 }
}

  

routing路由模式

public class RoutingTopicTest {
 
 @Test
 public void routingSend() throws Exception{
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明交换机
   //参数意义,1 交换机名称,2 类型:fanout,direct,topic
   chan.exchangeDeclare("directEx", "direct");
   //发送消息
   String msg="路由模式的消息";
   chan.basicPublish("directEx", "jt1713", 
       null, msg.getBytes());
 }
 @Test
 public void routingRec01() throws Exception{
   System.out.println("一号消费者等待接收消息");
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明队列
   chan.queueDeclare("direct01", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("directEx", "direct");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("direct01", "directEx", "jt1712");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("direct01",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("一号消费者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 @Test
 public void routingRec02() throws Exception{
   System.out.println("二号消费者等待接收消息");
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明队列
   chan.queueDeclare("direct02", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("directEx", "direct");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("direct02", "directEx", "jt1711");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("direct02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二号消费者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
}

  

topic主题模式

*号代表单个词语
#代表多个词语

其他的内容与routing路由模式一致

public class RoutingTopicTest {
 
 
 @Test
 public void routingRec02() throws Exception{
   System.out.println("二号消费者等待接收消息");
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明队列
   chan.queueDeclare("direct02", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("directEx", "direct");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("direct02", "directEx", "jt1711");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("direct02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二号消费者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 
 @Test
 public void topicSend() throws Exception{
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明交换机
   //参数意义,1 交换机名称,2 类型:fanout,direct,topic
   chan.exchangeDeclare("topicEx", "topic");
   //发送消息
   String msg="主题模式的消息";
   chan.basicPublish("topicEx", "jt1712.add.update", 
       null, msg.getBytes());
 }
 @Test
 public void topicRec01() throws Exception{
   System.out.println("一号消费者等待接收消息");
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明队列
   chan.queueDeclare("topic01", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("topicEx", "topic");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("topic01", "topicEx", "jt1712");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("topic01",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("一号消费者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
 @Test
 public void topicRec02() throws Exception{
   System.out.println("二号消费者等待接收消息");
   //获取连接
   Connection conn = ConnectionUtil.getConn();
   Channel chan = conn.createChannel();
   //声明队列
   chan.queueDeclare("topic02", false, false, false, null);
   //声明交换机
   chan.exchangeDeclare("topicEx", "topic");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queueBind("topic02", "topicEx", "jt1712.#");
   chan.basicQos(1);
   //定义消费者
   QueueingConsumer consumer=new QueueingConsumer(chan);
   //消费者与队列绑定
   chan.basicConsume("topic02",false, consumer);
   while(true){
     Delivery delivery= consumer.nextDelivery();
     System.out.println("二号消费者接收到"+
     new String(delivery.getBody()));
     chan.basicAck(delivery.getEnvelope().
         getDeliveryTag(), false);
   }
 }
}

  

原文地址:https://www.cnblogs.com/zqyanywn/p/10150765.html