ActiveMQ学习笔记(18)----Message高级特性(二)

1. Blob Message

  有些时候,我们需要传递Blob(Binary Large Objects)消息,在5.14之前,(5.12和5.13需要在jetty.xml中手动开启)可以按照如下的方式配置使用fileserver:

  配置BLOB Tansfer Policy,可以在发送方的连接URI上设置,如:

tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8161/fileserver

  在5.14之后,就只能通过使用ftp协议来发送blobmessage,或自己将文件传到某个服务器上(通过FTP或其他方式),而后将该文件的url放在BlobMessage中再发送这条BlobMessage。不过,5.15好像又提供了http方式,不过需要自己实现文件上传服务器,web服务器上传代码可以参考

  http://svn.apache.org/repos/asf/activemq/trunk/activemq-fileserver/中三个类的的实现方式。 

  消费者的消费不受影响。

  代码实现方式如下:

 
package com.wangx.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

import javax.jms.*;
import java.io.File;

public class MessageSender {

    public static void main(String[] args) throws JMSException {
        //创建链接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = null;
        ActiveMQSession session = null;
        try{
            //创建链接
            connection =  factory.createConnection();
            //启动链接
            connection.start();
            //获取会话
            session = (ActiveMQSession) connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
            //创建队列
            Destination queue = session.createQueue("myQueue");
            //创建生产者对象
            MessageProducer messageProducer = session.createProducer(queue);
            //创建blob消息
            BlobMessage blobMessage = session.createBlobMessage(new File("pom.xml"));
            messageProducer.send(blobMessage);
            session.commit();
            session.close();
            connection.close();
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }

    }
}
 

  此时使用的是默认的文件上传服务器地址,地址为:

  http://localhost:8080/uploads/

  如果需要自定义地址,可以在uri上添加

tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://foo.com/

  或使用:

  BlobTransferPolicy构建策略,并通过factory.setBlobTransferPolicy();设置策略。

  接下来消费者使用方式:

 
package com.wangx.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;

import javax.jms.*;
import java.io.IOException;
import java.io.InputStream;

public class MessageReceive {

    public static void main(String[] args) {
        //创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = null;
        Session session = null;
        try{
            //创建链接
            connection =  factory.createConnection();
            //启动链接
            connection.start();
            //获取会话
            session = connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
            //创建队列
            Destination queue = session.createQueue("myQueue");
            //创建消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
            //监听消息
           messageConsumer.setMessageListener(new MessageListener() {
               @Override
               public void onMessage(Message message) {
                   if (message instanceof BlobMessage) {
                       //监听BlobMessage
                       BlobMessage blobMessage = (BlobMessage) message;
                       try {
                           InputStream in = blobMessage.getInputStream();
                           byte[] bytes = new byte[in.available()];
                           System.out.println(new String(bytes));
                       } catch (Exception e) {
                           e.printStackTrace();
                       }

                       // process the stream...
                   }
               }
           });
            session.commit();
            session.close();
            connection.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

  如果你发送到的文件或者URL存在,比如发给共享文件系统或者是WebServer上的web应用,那么你可以使用如下方式:

BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com");
producer.send(message);

2. Message Transformation

  有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,可以在如下对象上调用:

  ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.

  在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。

  官方文档解释如下:

  

原文 ActiveMQ学习笔记(18)----Message高级特性(二)

原文地址:https://www.cnblogs.com/xiaoshen666/p/10854715.html