分布式-信息方式-ActiveMQ的集群

                                     ActiveMQ的集群
Queue consumer clusters
              ActiveMQ支持 Consumer对消息高可靠性的负载平衡消费,如果一个 Consumer死掉,
该消息会转发到其它的 Consumer消费的 Queue上。如果一个 Consumer获得消息比其它
Consumer快,那么他将获得更多的消息。

因此推荐 ActiveMQ的 Broker和Client使用failover://transport的方式来配置链接。


Broker clusters
            大部情况下是使用一系列的 Broker和 Client链接到一起。如果一个 Broker死掉了
Client可以自动链接到其它 Broke上。实现以上行为需要用 failover协议作为 Client。
           如果启动了多个 Broker, Client可以使用 static discover或者 Dynamic discovery
容易的从一个 broker到另一个 broker直接链接。
           这样当一个 broker上没有 Consumer的话,那么它的消息不会被消费的,然而该
broker会通过存储和转发的策略来把该消息发到其它 broker上。
           特别注意: ActiveMQ默认的两个 broker, static链接后是单方向的, broker-A可以
访问消费 broker-B的消息,如果要支持双向通信,需要在 netWorkConnector配置的时候,
设置duplex=true就可以了。

如图所示:

把b1和b2当成一个集群,c1和c2为两个客服端。

配置如下:

  <networkConnectors>
                 <networkConnector name="local network"  duplex="true"   conduitSubscriptions="false"
                  uri="static://(tcp://192.168.145.100:61616,tcp://192.168.145.100:61676)"/>
      </networkConnectors>
conduitSubscriptions="false" 默认为true. 

代码如下:

package test.mq.staitsnetwork;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
       public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                //"tcp://192.168.145.100:61616");
        "failover:(tcp://192.168.145.100:61616,tcp://192.168.145.100:61676)?randomize=true");
        Connection connection=ConnectionFactory.createConnection();
        connection.start();
         
        Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        Destination destination=session.createQueue("my_queue");
        MessageProducer Producer=session.createProducer(destination);
     
        for(int i=0;i<30;i++){
             TextMessage message=session.createTextMessage("message----"+i);
                //Thread.sleep(1000);  
                Producer.send(message);
        }
         session.commit();
         session.close();
         connection.close();    
    }
}
package test.mq.staitsnetwork;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QReceiver1{
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61616"
                );
        Connection  connection = connectionFactory.createConnection();
        connection.start();
        final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
        Destination destination=session.createQueue("my_queue");
        for(int i=0;i<1;i++){
            MessageConsumer Consumer=session.createConsumer(destination);
            Consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message msg) {
                    TextMessage     txtmsg=(TextMessage) msg; 
                    try {
                        System.out.println("接收信息--->"+txtmsg.getText());
                        session.commit();
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }    
                    
                }
            });
        }

         
         
        
        
     
    }
}
 
package test.mq.staitsnetwork;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class QReceiver2{
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61676"
                );
        Connection  connection = connectionFactory.createConnection();
        connection.start();
        final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
        Destination destination=session.createQueue("my_queue");
        for(int i=0;i<2;i++){
            MessageConsumer Consumer=session.createConsumer(destination);
            Consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message msg) {
                    TextMessage     txtmsg=(TextMessage) msg; 
                    try {
                        System.out.println("接收信息--->"+txtmsg.getText());
                        session.commit();
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }    
                    
                }
            });
        }

         
         
        
        
     
    }
}
 
原文地址:https://www.cnblogs.com/caoyingjielxq/p/9364103.html