HornetQ集群配置及spring集成示例

HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。还支持RESTful API、STOMP(Stomp的客户端可以用多种编程语言来实现 

)、AMQP(HornetQ will shortly be implementing AMQP )。

  • HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
  • HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
  • HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
  • HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
  • HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。

用途:松散地联系各系统,不用受其它服务器的制约,有效的减少线程Block的时间. 不同于RPC , 采用的Request/Reponse 的方式. 

hornetq支持内容Body 
Stream -- StreamMessage 包含顺序读取值的流  
Text -- TextMessage) 
Map -- MapMessage (key/value)) 
Object --  ObjectMessage  Support Serializable序列化的对象. 
Bytes --  BytesMessage  字节信息(如存放图像)  
下载:wget http://downloads.jboss.org/hornetq/hornetq-2.2.14.Final.zip
yum install libaio
1.单机配置:
1.1编写启动脚本:start.sh
IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1`
export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP -Djnp.host=$IP"
echo $CLUSTER_PROPS
sh run.sh &
1.2或者修改配置文件
以下两个文件把localhost替换为本机IP
config/stand-alone/non-clustered/hornetq-configuration.xml  
config/stand-alone/non-clustered/hornetq-beans.xml 
bindAddress">${jnp.host:192.168.100.241}
rmiBindAddress">${jnp.host:192.168.100.241}
${hornetq.remoting.netty.host:192.168.100.241 }
....
1.3客户端需要的包
hornetq-core-client.jar
netty.jar
hornetq-jms-client.jar
jboss-jms-api.jar
 jnp-client.jar
 
1.4配置一个队列,添加配置onfig/stand-alone/non-clustered/hornetq-jms.xml 
 <queue name="OrderQueue">
        <entry name="queues/OrderQueue"/>
    </queue>
配置一个主题
<topic name="topic1">
  <entry name="/my/Topic1"/>
</topic>
hornetq-configuration.xml
在<configuration>节点下增加
<security-enabled>false</security-enabled> 
1.5收发消息demo
public void sendToQueue(String destinationName,Serializable payload) throws Exception {
InitialContext ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Queue queue = (Queue)ic.lookup(destinationName);
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer publisher = session.createProducer(queue);
connection.start();
ObjectMessage message = session.createObjectMessage(payload);
message.setObject(payload);
publisher.send(message);
if (connection != null) {
connection.close();
}
}
@TransactionAttribute(value = TransactionAttributeType.REQUIRED)
public void onMessage(Message message) {
ObjectMessage obj = (ObjectMessage) message;
try {
Serializable ser = obj.getObject();
log.info("[NotificationInbound] onMessage!");
}
catch (Exception e) {
log.error("[NotificationInbound] ERROR[" + e.getMessage() + "]!!!****");
throw new IllegalStateException();
}
}
2 集群配置
2.1单机集群启动脚本
start-cluster0.bat
set CLUSTER_PROPS=-Ddata.dir=../data-server2 -Djnp.port=2099 -Djnp.rmiPort=2098 -Dhornetq.remoting.netty.port=6445
run   ../config/stand-alone/clustered 
start-cluster1.bat
set CLUSTER_PROPS=-Ddata.dir=../data-server3 -Djnp.port=3099 -Djnp.rmiPort=3098 -Dhornetq.remoting.netty.port=7445
run   ../config/stand-alone/clustered
2.2集群节点启动脚本
start-node.sh
IP=`/sbin/ip a |grep 'inet '|awk -F'/' '{print $1}'|awk '{print $2}'|grep -v 127.0.0.1|head -1`
export CLUSTER_PROPS="-Dhornetq.remoting.netty.host=$IP  -Djnp.host=$IP"
echo $CLUSTER_PROPS
sh run.sh   ../config/stand-alone/clustered
2.2.1集群节点停止脚本
stop-node.sh
sh stop.sh ../config/stand-alone/clustered
2.3.集群配置说明
2.3.1集群发现使用udp协议进行组播
hornetq-configuration.xml
<discovery-groups>
<discovery-group name="my-discovery-group">
<local-bind-address>172.16.9.7</local-bind-address>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<connection-factory name="ConnectionFactory">
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
</connection-factory>
2.3.2客户端连接代码 :
final String groupAddress = "231.7.7.7";
final int groupPort = 9876;
ConnectionFactory jmsConnectionFactory =
HornetQJMSClient.createConnectionFactory(groupAddress, groupPort);
Connection jmsConnection1 = jmsConnectionFactory.createConnection();
Connection jmsConnection2 = jmsConnectionFactory.createConnection();
 
2.3.3Server Side load balancing
hornetq-configuration.xml
<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>
 
2.3.4Client Side load balancing
hornetq-jms.xml
<connection-factory name="ConnectionFactory">
<discovery-group-ref discovery-group-name="my-discovery-group"/>
<entries>
<entry name="/ConnectionFactory"/>
</entries>
<ha>true</ha>
<connection-load-balancing-policy-class-name>
org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
</connection-load-balancing-policy-class-name>
</connection-factory>
3.与spring集成示例
3.1spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
        xmlns:jee="http://www.springframework.org/schema/jee" xmlns:lang="http://www.springframework.org/schema/lang"
        xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">


      <bean id="messageTopic" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createTopic">
              <constructor-arg value="topic1" />
      </bean>

    <bean id="searchAddMessageQueue" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createQueue">
        <constructor-arg value="ExpiryQueue"></constructor-arg>
    </bean>
    <!-- 
      <bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
              <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
              <constructor-arg>
                      <map key-type="java.lang.String" value-type="java.lang.Object">
                              <entry key="host" value="localhost"></entry>
                              <entry key="port" value="5445"></entry>
                      </map>
              </constructor-arg>
      </bean>
    
     -->
      <bean id="transportConfiguration" class="org.hornetq.api.core.DiscoveryGroupConfiguration">
              <constructor-arg name="groupAddress" value="231.7.7.7" />
              <constructor-arg name="groupPort" value="9876">
              </constructor-arg>
      </bean>

      <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createConnectionFactoryWithHA" destroy-method="close">
              <constructor-arg type="org.hornetq.api.jms.JMSFactoryType" value="CF" />
              <constructor-arg ref="transportConfiguration" />
      </bean>

      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="pubSubDomain" value="true" />      
      </bean>

      <bean id="topicService" class="org.langke.hornetq.ClientServiceImpl">
              <property name="jmsTemplate" ref="jmsTemplate" />
              <property name="topic" ref="messageTopic" />
      </bean>
      

    <bean id="sendMessageService" class="org.langke.hornetq.SendMessageServiceImpl">
        <property name="jmsTemplate" ref="jmsTemplate"></property>
        <property name="searchAddMessageQueue" ref="searchAddMessageQueue"></property>
    </bean>
    
    <!-- this is the Message Driven POJO (MDP)
    <bean id="messageListener" class="org.langke.hornetq.MessageListenerImpl">
    </bean>
 -->
    <bean id="receiveMessageListener" class="org.langke.hornetq.ReceiveMessageListenerImpl"></bean>

    <!-- and this is the message listener container -->
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <!-- <property name="destination" ref="messageTopic" /> -->
        <property name="destination" ref="searchAddMessageQueue"></property>
        <property name="messageListener" ref="receiveMessageListener" />
    </bean>

</beans>
 
package org.langke.common.hornetq;


public interface MessageService {
    public boolean sendMessage(SerializableObject message) ;
}

3.2发送消息

package org.langke.common.hornetq;

import java.io.Serializable;

public class SerializableObject implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private Object obj ;
    private Boolean isRetry = true;
    
    public Object getObj() {
        return obj;
    }

    public void setObj(Object obj) {
        this.obj = obj;
    }

    public Boolean getIsRetry() {
        return isRetry;
    }

    public void setIsRetry(Boolean isRetry) {
        this.isRetry = isRetry;
    }
    
    
}
package org.langke.common.hornetq;


import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class SendMessageServiceImpl implements MessageService {
    private static final Logger logger = Logger.getLogger(SendMessageServiceImpl.class);
    private JmsTemplate jmsTemplate;
    private Queue searchAddMessageQueue;
    @Override
    public boolean sendMessage(SerializableObject message) {
        return sendQueue(message);
    }
    
    private boolean sendQueue(final SerializableObject so) {
        try {
            logger.info("start to send queue to " + searchAddMessageQueue.getQueueName() + ", message : " + so);
            jmsTemplate.send(searchAddMessageQueue, new MessageCreator() {
                @Override
                public Message createMessage(Session session)
                        throws JMSException {
                    ObjectMessage om = session.createObjectMessage(so);
                    return om;
                }
            }); 
            return true;
        } catch (Exception e) {
            logger.error("Error: send topic failure:" + e.getMessage(), e);
            return false;
        }
    }


    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public Queue getSearchAddMessageQueue() {
        return searchAddMessageQueue;
    }

    public void setSearchAddMessageQueue(Queue searchAddMessageQueue) {
        this.searchAddMessageQueue = searchAddMessageQueue;
    }

 

}

3.3接收消息

package org.langke.common.hornetq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.log4j.Logger;

public class ReceiveMessageListenerImpl implements MessageListener {
    private AtomicInteger count = new AtomicInteger(0);
    private static Logger logger = Logger.getLogger(ReceiveMessageListenerImpl.class);
    @Override
    public void onMessage(Message message) {
        try{
            if(message instanceof ObjectMessage){
                ObjectMessage objectMessage = (ObjectMessage)message;
                if(objectMessage.getObject() instanceof SerializableObject){
                    SerializableObject so = (SerializableObject) objectMessage.getObject();
                    logger.info(so.getObj());
                }else{
                    logger.info(objectMessage);
                }
            }else{

                System.out.println(message);
            }
        } catch (JMSException e) {
            logger.error(
                    "Error: receive message from topic failure: "
                            + e.getMessage(), e);
        }finally{

            System.out.println(count.incrementAndGet());
        }
    }

}

3.4调用示例

package org.langke.common.hornetq;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
 

public class Test {

    private static ApplicationContext ctx;
    private static Test instance=new Test();
    
    public  static Test getInstance(){
        return instance;
    }
    
    private Test() {
        if(ctx == null) {
            String location = null;
            if(System.getProperty("os.name").toLowerCase().contains("windows")){
                location = "conf/applicationContext.xml";
            }else{
                location = "../conf/applicationContext.xml";
            }
            File file = new File(location);
            ctx = new FileSystemXmlApplicationContext(location); 
         }  
    }
     
    /**
     * @param args
     */
    public static void main(String[] args) {

        getInstance();
        MessageService service = ctx.getBean("sendMessageService", MessageService.class);

        for(int i=0;i<3000;i++){
            Map map = new HashMap();
            map.put("ooxx", i);
            SerializableObject so = new SerializableObject();
            so.setObj(map);
            service.sendMessage(so);
        }
        
    }
}
4.其它功能
4.1Message expire
HornetQ will not deliver a message to a consumer after it's time to
live has been exceeded.
If the message hasn't been delivered before the time to live is
reached, the server can discard it.
// message will expire in 5000ms from now
message.setExpiration(System.currentTimeMillis() + 5000);
Expiry-address
<!-- expired messages in exampleQueue will be sent to the expiry
address expiryQueue -->
<address-setting match="jms.queue.exampleQueue">
<expiry-address>jms.queue.expiryQueue</expiry-address>
</address-setting>
4.2
Scheduled messages
TextMessage message = session.createTextMessage("MSG");
message.setLongProperty("_HQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000);
producer.send(message);
...
// message will not be received immediately but 5 seconds later
TextMessage messageReceived = (TextMessage) consumer.receive();
4.3Message group
Message groups are sets of messages that have the following characteristics:
• Messages in a message group share the same group id; that is, they have the same group
identifier property (JMSXGroupID for JMS, _HQ_GROUP_ID for HornetQ Core API).
• Messages in a message group are always consumed by the same consumer, even if there
are many consumers on a queue. They pin all messages with the same group id to the same
consumer.
If that consumer closes another consumer is chosen and will receive all messages with the
samegroup id.
Based on message
Message message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
Based on connection factory...
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<group-id>Group-0</group-id>
</connection-factory>

 

原文地址:https://www.cnblogs.com/langke93/p/2631518.html