ActiveMQ

参考文章:http://blog.csdn.net/tommy_lgj/archive/2008/11/21/3348137.aspx

ActiveMQ的安装和使用

  在官网http://activemq.apache.org/ 下载压缩包,解压后双击activemq.bat。启动ActiveMQ以后,登陆http://localhost:8161/admin/,即可创建消息队列和收发消息。在编写客户端代码时需导入activemq-all-5.11.1.jar。

消息模式

  消息列队有两种消息模式:

  点对点的模式:主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息将会先进入队列中。如果有接收端在监听,则会发向接收端,否则会保存在activemq服务器,直到接收端接收消息。点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到。

  订阅/发布模式:同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖。就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送。也就是发送端发送消息的时候,接收端必须在线才能收到消息。这个模式还有一个特点,就是发送端发送的消息,将会被所有的接收端给接收到,不像点对点一条消息只会被一个接收端给接收到。

点对点发送端:

public class Sender {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session = null;
        Destination destination;
        MessageProducer producer;
        
        try{
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TestQueue");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage message = session.createTextMessage("Message from Producer "+new Date());
            producer.send(message);
            session.commit();
            System.out.println("Send Message: " + message.getText());
        }catch(Exception e){
            session.rollback();
        }finally{
            if(connection!=null)
                connection.close();
        }
    }
}

点对点接收端:

public class Receiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory;
        Connection connection = null;
        Session session;
        Destination destination;
        MessageConsumer consumer;
        
        try{
            connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TestQueue");
            consumer = session.createConsumer(destination);
            // 消息的同步接收: 客户端主动去接收消息
            // System.out.println("Receive Message: " + ((TextMessage) consumer.receive()).getText());
            
            // 消息的异步接收: 当消息到达时,ActiveMQ调用MessageListener中的onMessage函数,主动通知客户端
            consumer.setMessageListener(new MessageListener(){
                @Override
                public void onMessage(Message message) {
                    try {
                        if (null != message && message instanceof TextMessage)
                            System.out.println("Receive Message: " + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
       Thread.sleep(1000 * 100);
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(connection!=null)
                connection.close();
        }
    }
}     

发布/订阅发送端: 

public class TopicSender {

    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory connectionFactory;
        TopicConnection connection = null;
        TopicSession session = null;
        TopicPublisher publisher;
        
        try{
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            connection = connectionFactory.createTopicConnection();
            connection.start();
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("mq.topic");
            publisher = session.createPublisher(topic);
            TextMessage message = session.createTextMessage("Message from Producer " + new Date());
            publisher.send(message);
            session.commit();
            System.out.println("Send Message: " + message.getText());
        }catch(Exception e){
            session.rollback();
        }finally{
            if(connection!=null)
                connection.close();
        }
    }
}

发布/订阅接收端:

public class TopicReceiver {

    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory connectionFactory;
        TopicConnection connection = null;
        TopicSession session = null;
        TopicSubscriber subscriber;
        
        try{
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            connection = connectionFactory.createTopicConnection();
            connection.start();
            session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("mq.topic");
            subscriber = session.createSubscriber(topic);
            while(true){
                Message message = subscriber.receive();
                if(null != message && message instanceof TextMessage)
                    System.out.println("Receive Message: " + ((TextMessage)message ).getText());
                else 
                    break;
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(connection!=null)
                connection.close();
        }
    }
}

消息类型:

//纯字符串的数据
session.createTextMessage();
//序列化的对象
session.createObjectMessage();
//流,可以用来传递文件等
session.createStreamMessage();
//用来传递字节
session.createBytesMessage();
//这个方法创建出来的就是一个map,可以把它当作map来用
session.createMapMessage();
//这个方法拿到的是javax.jms.Message,是所有message的接口
session.createMessage();

 spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    
    <!-- 连接池  -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  
        <property name="connectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <property name="brokerURL" value="tcp://localhost:61616" />  
            </bean>  
        </property>  
    </bean>  
      
    <!-- 连接工厂 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://localhost:61616" />  
    </bean>  
    
    <!-- 配置消息目标 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg index="0" value="TestQueue" />  
    </bean>  
 
    <!-- 消息模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="activeMQConnectionFactory" />  
        <property name="defaultDestination" ref="destination" />  
        <property name="messageConverter">  
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
        </property>  
    </bean>  
</beans>

可靠性机制

  发送消息最可靠的方法就是在事务中发送持久性的消息,ActiveMQ默认发送持久性消息。结束事务有两种方法:提交或者回滚。当一个事务提交,消息被处理。如果事务中有一个步骤失败,事务就回滚,这个事务中的已经执行的动作将被撤销。

  低可靠性可以降低开销和提高性能,例如发送消息时可以更改消息的优先级或者指定消息的过期时间。性能和可靠性之间的折衷是设计时要重点考虑的一个方面。可以选择生成和使用非持久性消息来获得最佳性能。另一方面,也可以通过生成和使用持久性消息并使用事务会话来获得最佳可靠性。

1. 消息签收模式

  Session是一个发送或接收消息的线程。Session可以被事务化,也可以不被事务化,可以通过向Connection的创建方法传递一个布尔参数对此进行设置。其中transacted为使用事务标识,acknowledgeMode为签收模式。

  Session createSession(boolean transacted, int acknowledgeMode);

当transacted为true时, 表示session是带事务的,确认消息就通过确认和校正来自动地处理,需手动调用session.commit() 或 session.rollback();。第二个参数是无效的。
当transacted为false时,表示session是不带事务的,此时有三种用于消息确认的选项:
  1)AUTO_ACKNOWLEDGE 自动确认。客户端接收到消息(receive或onMessage成功返回时),即为消费成功,不需要做额外的工作。
  2)CLIENT_ACKNOWLEDGE 客户端确认。客户端接收到消息后,必须调用message.acknowledge()方法。jms服务器才会删除消息。
  3)DUPS_OK_ACKNOWLEDGE 允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。Session不必确保对传送消息的签收。它可能引起消息的重复,但是降低了Session的开销。在需要考虑资源使用时,这种模式非常有效。

2. 消息传递方式

  ActiveMQ支持两种消息传送模式:PERSISTENT和NON_PERSISTENT两种。

  a.PERSISTENT(持久性消息)
  这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,等到这个服务恢复联机的时候它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

  b.NON_PERSISTENT(非持久性消息)
  保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。

有两种方法指定传送模式:
  1)使用setDeliveryMode方法,这样所有的消息都采用此传送模式;
  2)使用send方法为每一条消息设置传送模式;

3. 设置消息优先级
  通常可以确保将单个会话向目标发送的所有消息按其发送顺序传送至消费者。然而如果为这些消息分配了不同的优先级,消息传送系统将首先尝试传送优先级较高的消息。消息优先级从0-9十个级别,0-4是普通消息,5-9是加急消息。如果不指定优先级,则默认为4。JMS不要求严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。

有两种方法设置消息的优先级:
  void setPriority(int newDefaultPriority);
  void send(Destination destination, Message message, int deliveryMode, int priority,long timeToLive);

4. 允许消息过期

  默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。

有两种方法设置消息的过期时间,时间单位为毫秒:
  void setTimeToLive(long timeToLive);
  void send(Destination destination, Message message, int deliveryMode, int priority,long timeToLive);

附加

  问题: 若在控制台查看message的内容是提示错误 An error occurred at line: 20 in the jsp file: /WEB-INF/tags/form/forEachMapEntry.tag 

  解决: 表示JDK1.8不兼容,在 activemq.bat 的 checkJava中设置JDK为1.7,set _JAVACMD=C:Program FilesJavajdk1.7.0_65injava.exe

原文地址:https://www.cnblogs.com/anxiao/p/7217491.html