rabbitmq 对多服务器p2p模式配置的一个测试

一直对rabbitmq p2p 模式的多服务器下做相同配置的 各个服务器数据接受情况比较好奇

今天有空测试了下 

xml 文件

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
  4.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
  5.      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  6.      http://www.springframework.org/schema/beans  
  7.      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  8.      http://www.springframework.org/schema/rabbit  
  9.      http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">  
  10.        
  11.     <!--配置connection-factory,指定连接rabbit server参数 -->  
  12.     <rabbit:connection-factory id="connectionFactory"  
  13.          port="5672"  username="guest" password="guest" host="127.0.0.1"  
  14.         />  
  15.       
  16.       
  17.         <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->  
  18.     <rabbit:admin connection-factory="connectionFactory" />  
  19.       
  20.     <!--定义queue -->  
  21.     <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />  
  22.       
  23.         <!-- 定义direct exchange,绑定queueTest -->  
  24.     <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">  
  25.         <rabbit:bindings>  
  26.             <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>  
  27.         </rabbit:bindings>  
  28.     </rabbit:direct-exchange>  
  29.       
  30.     <bean id="jsonMessageConverter"    
  31.         class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>  
  32.       
  33.     <!--定义rabbit template用于数据的接收和发送 -->  
  34.     <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"   
  35.         exchange="exchangeTest"   message-converter="jsonMessageConverter" />  
  36.           
  37.         
  38.     <!-- 消息接收者 -->  
  39.     <bean id="messageReceiver" class="com.bimatrix.revit.mq.MessageConsumer"></bean>  
  40.       
  41.     <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  
  42.     <rabbit:listener-container connection-factory="connectionFactory">  
  43.              <rabbit:listener queues="queueTest" ref="messageReceiver"/>  
  44.     </rabbit:listener-container>  
  45.       
  46. </beans>  
[html] view plain copy
 
  1. </pre><pre code_snippet_id="1702272" snippet_file_name="blog_20160531_4_8975174" name="code" class="html">  
[html] view plain copy
 
  1. </pre><p></p><p>消费者:</p><p></p><pre code_snippet_id="1702272" snippet_file_name="blog_20160531_5_4133388" name="code" class="java">@Component  
  2. public class MessageConsumer implements MessageListener {  
  3.       
  4.     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);  
  5.     int receiveNum =0 ;  
  6.   
  7.   
  8.     @Override  
  9.     public void onMessage(Message message) {  
  10.         receiveNum++ ;  
  11.         //logger.info("receive message:{}",message);  
  12. //      System.out.println("receive message:{}"+message);  
  13.         System.out.println("receive: "+receiveNum);  
  14.     }  
  15.   
  16.   
  17. }  


测试代码:

[java] view plain copy
 
  1. @RunWith(value = SpringJUnit4ClassRunner.class)  
  2. @ContextConfiguration(locations = { "classpath*:config/applicationContext.xml" })  
  3. public class TestQueue {  
  4.     @Autowired  
  5.     AmqpTemplate amqpTemplate;  
  6.   
  7.   
  8.     final String queue_key = "queueTestKey";  
  9.   
  10.   
  11.     @Test  
  12.     public void send() {  
  13.         try {  
  14.             //System.out.println("-----------------------------------------");  
  15.             for(int i=1;i<=1000;i++){  
  16.                 amqpTemplate.convertAndSend(queue_key, i);  
  17.                 //System.out.println(i);  
  18.             }  
  19.             //System.out.println("-----------------------------------------");  
  20.         } catch (Exception e) {  
  21. //          LOGGER.error(e);  
  22.         }  
  23.     }  



连续发送1000条数据给消费者,同一台机器8080 9080端口各启动一个tomcat 代码完全一样,

接受情况

A 机器 receive: 481

receive: 57  不知道为什么分两次打出来 ,在两个输出界面一个tomcat console 一个是junit的测试输出console 

B 机器 receive: 462 

三者合计就是1000  再次测试总和也是对的

说明两台服务器在p2p的配置下 各接受了一部分生产者发送来的数据

再多服务器下配置p2p 相当于阻塞队列 多个线程同时处理,各吃掉一部分队列的数据,起到了分流的效果

当然每个消费者那里还可以在起动线程池来处理各自接收的数据。 

高并发情况下 这种配置也是可以减轻各个服务器压力

原文地址:https://www.cnblogs.com/zxtceq/p/8559028.html