RabbitMQ简单应用の轮训分发

MQ连接工厂还是之前的那个Connection

 1 package com.mmr.rabbitmq.util;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 
 8 public class ConnectionUtils {
 9     /**
10      * @desc 获取Mq 的链接
11      * @author zp
12      * @throws IOException 
13      * @date 2018-7-19
14      */
15     public static  Connection getConnection() throws IOException {
16         // 1.定义一个链接工厂
17         ConnectionFactory factroy = new ConnectionFactory();
18         
19         // 2.设置服务地址
20         factroy.setHost("127.0.0.1");
21         
22         // 3.设置端口号
23         factroy.setPort(5672);
24         
25         // 4.vhost  设置数据库
26         factroy.setVirtualHost("vhtest");
27         
28         // 5.设置用户名
29         factroy.setUsername("jerry");
30         
31         // 6. 设置密码
32         factroy.setPassword("123456");
33         
34         // 7.返回链接
35         return factroy.newConnection();
36     }
37 }
View Code

消息生产者类的定义Send

 1 package com.mmr.rabbitmq.work;
 2 
 3 import java.io.IOException;
 4 
 5 import com.mmr.rabbitmq.util.ConnectionUtils;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 
 9 public class Send {
10     
11     /*
12      *         |--C1
13      * P-------|--C2
14      *         |--C3
15      * 
16      * */
17     private static final String QUEUE_NAME="test_work_queue";
18     public static void main(String[] args) throws IOException, InterruptedException{
19         // 获取链接
20         Connection connection = ConnectionUtils.getConnection();
21         
22         // 获取通道
23         Channel channel = connection.createChannel();
24         // 声明队列
25         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
26         
27         for (int i = 0; i < 50; i++) {
28             String msg = "hello "+i;
29             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
30             System.out.println("send msg 的第"+i+"条");
31             Thread.sleep(i*20);
32         }
33         channel.close();
34         connection.close();
35     }
36 }
View Code

消息消费者累的定义 Recv1  Recv2

 1 package com.mmr.rabbitmq.work;
 2 
 3 import java.io.IOException;
 4 
 5 import com.mmr.rabbitmq.util.ConnectionUtils;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.Consumer;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 
13 public class Recv1 {
14     private static final String QUEUE_NAME="test_work_queue";
15     public static void main(String[] args) throws IOException{
16         // 获取链接
17         Connection connection = ConnectionUtils.getConnection();
18         
19         //获取频道
20         
21         Channel channel = connection.createChannel();
22         
23         // 声明队列
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26         // 定义一个消费者
27         Consumer consumer = new DefaultConsumer(channel){
28             // 一旦有消息 就会触发这个方法  消息到达 
29             @Override
30             public void handleDelivery(String consumerTag, Envelope envelope,
31                     BasicProperties properties, byte[] body) throws IOException {
32                 // TODO Auto-generated method stub
33                 // 拿消息
34                 String msg = new String(body,"utf-8");
35                 
36                 //搭出来
37                 System.out.println("[1]Recv msg:"+msg);
38                 try {
39                     Thread.sleep(2000);
40                 } catch (Exception e) {
41                     // TODO: handle exception
42                     e.printStackTrace();
43                 }finally{
44                     System.out.println("[1] done");
45                 }
46             }
47         };
48         boolean autoAck = true;
49         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
50         
51     }
52 }
View Code
 1 package com.mmr.rabbitmq.work;
 2 
 3 import java.io.IOException;
 4 
 5 import com.mmr.rabbitmq.util.ConnectionUtils;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.Consumer;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 
13 public class Recv2 {
14     private static final String QUEUE_NAME="test_work_queue";
15     public static void main(String[] args) throws IOException{
16         // 获取链接
17         Connection connection = ConnectionUtils.getConnection();
18         
19         //获取频道
20         
21         Channel channel = connection.createChannel();
22         
23         // 声明队列
24         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25         
26         // 定义一个消费者
27         Consumer consumer = new DefaultConsumer(channel){
28             // 一旦有消息 就会触发这个方法  消息到达 
29             @Override
30             public void handleDelivery(String consumerTag, Envelope envelope,
31                     BasicProperties properties, byte[] body) throws IOException {
32                 // TODO Auto-generated method stub
33                 // 拿消息
34                 String msg = new String(body,"utf-8");
35                 
36                 //搭出来
37                 System.out.println("[2]Recv msg:"+msg);
38                 try {
39                     Thread.sleep(1000);
40                 } catch (Exception e) {
41                     // TODO: handle exception
42                     e.printStackTrace();
43                 }finally{
44                     System.out.println("[2] done");
45                 }
46             }
47         };
48         boolean autoAck = true;
49         channel.basicConsume(QUEUE_NAME, autoAck,consumer);
50         
51     }
52 }
View Code

1.首先我们运行Recv1  Recv2 对消息进行监听

2.其次我们运行Send,开始生产消息。

3.最后得到的结果是:消费者1(都是偶数)和消费者2(都是奇数)处理消息是一样的

为什么会出现这种现象呢?

----这种方式叫做轮训分发(round-robin)结果就是不管谁忙谁闲,都不会多给一个消息,任务就是你一个我一个。

原文地址:https://www.cnblogs.com/pengpengzhang/p/9335613.html