Message高级特性 & 内嵌Jetty实现文件服务器

1. Messaage Properties  常见属性

  更多的属性以及介绍参考:http://activemq.apache.org/activemq-message-properties.html

  消息属性,这个在之前刚学习ActiveMQ的时候已经介绍过,常见的如下:

  1. queue消息默认是持久化
  2. 消息得优先级默认是4.
  3. 消息发送时设置了时间戳。
  4. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略。
  5. 如果消息是重发的,将会被标记出来。
  6. JMSReplyTo标识响应消息发送到哪个queue.
  7. JMSCorelationID标识此消息相关联的消息id,可以用这个标识把多个消息连接起来。
  8. JMS同时也记录了消息重发的次数。默认是6次
  9. 如果有一组相关联的消息需要处理,可以分组;只需要设置消息组的名字和这个消息的第几个消息。
  10. 如果消息中一个事务环境,则TXID将会被设置。
  11. 此外ActiveMQ在服务器端额外设置了消息入队和出队的时间戳。
  12. ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型

2. Advisory Message  监听ActiveMQ自己的消息

  Advisory Message是ActiveMQ自身的系统消息地址,可以监听该地址来获取activemq的系统消息。目前支持获取如下信息:

consumers, producers和connections的启动和停止
创建和销毁temporary destinations
opics 和queues 的消息过期
brokers发送消息给destination,但是没有consumers
connections启动和停止

说明:

1. 所有advisory的topic,前缀是:ActiveMQ.Advisory
2. 所有Advisory的消息类型是:‘Advisory’,所有的Advisory都有的消息属性有:originBrokerId,originBrokerName,originBrokerURL
3. 具体支持的topic和queue,请参照:
   http://activemq.apache.org/advisory-message.html
Advisory功能默认是关闭的,打开Advisorie的具体实现如下:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerName" dataDirectory="${activemq.data}" schedulePeriodForDestinationPurge="1000">
        <destinationPolicy>
              <policyMap>
                <policyEntries>
                     <policyEntry queue=">"  advisoryForConsumed="true" />
                </policyEntries>
              </policyMap>
       </destinationPolicy>
    ...
</broker>

配置启动之后我们向主题chatTopic发送一条消息可以查看到如下activemq增加的主题:

 我们订阅上面主题:

package cn.qlq.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;

/**
 * 主题模式的消费消息
 * 
 * @author QiaoLiQiang
 * @time 2018年9月18日下午11:26:41
 */
public class MsgConsumer {

    // 默认端口61616
    private static final String url = "tcp://localhost:61616/";
    private static final String topicName = "ActiveMQ.Advisory.Producer.Topic.chatTopic";

    public static void main(String[] args) throws JMSException {
        // 1创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory创建connection
        Connection connection = connectionFactory.createConnection();
        // 设置链接的ID
        // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
        Topic destination = session.createTopic(topicName);
        // 创建TopicSubscriber来订阅;需要在连接上设置消费者id,用来识别消费者;设置好了过后再start 这个 connection
        // 5.启动connection
        connection.start();
        // 6.创建消费者consumer
        MessageConsumer consumer = session.createConsumer(destination);
        Message message = consumer.receive();
        while (message != null) {
            ActiveMQMessage txtMsg = (ActiveMQMessage) message;
            session.commit();
            System.out.println("收到消 息:" + txtMsg.getMessage());
            message = consumer.receive(1000L);
        }
        session.close();
        connection.close();
    }

}

第二种获取某一队列的ActiveMQ自身的生产者主题和消费者主题的方法如下:

     String topicName = "chatTopic";    
        Topic topic = session.createTopic(topicName);
        Destination destination = AdvisorySupport.getProducerAdvisoryTopic(topic);
        Destination destination2 = AdvisorySupport.getConsumerAdvisoryTopic(topic);

再次发送消息发现该消费者接收到的消息如下:

收到消 息:ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0:16, originalDestination = null, originalTransactionId = null, producerId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Topic.chatTopic, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1554966377407, brokerOutTime = 1554966377410, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@1324409e, dataStructure = ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:MicroWin10-1535-59019-1554966377239-1:1:1:1, destination = topic://chatTopic, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}, redeliveryCounter = 0, size = 0, properties = {producerCount=1, originBrokerName=brokerName, originBrokerURL=tcp://MicroWin10-1535:61616, originBrokerId=ID:MicroWin10-1535-57537-1554965648551-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
收到消 息:ActiveMQMessage {commandId = 0, responseRequired = false, messageId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0:17, originalDestination = null, originalTransactionId = null, producerId = ID:MicroWin10-1535-57537-1554965648551-1:1:0:0, destination = topic://ActiveMQ.Advisory.Producer.Topic.chatTopic, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 1554966377439, brokerOutTime = 1554966377439, correlationId = null, replyTo = null, persistent = false, type = Advisory, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = org.apache.activemq.util.ByteSequence@246ae04d, dataStructure = RemoveInfo {commandId = 0, responseRequired = true, objectId = ID:MicroWin10-1535-59019-1554966377239-1:1:1:1, lastDeliveredSequenceId = -2}, redeliveryCounter = 0, size = 0, properties = {producerCount=0, originBrokerName=brokerName, originBrokerURL=tcp://MicroWin10-1535:61616, originBrokerId=ID:MicroWin10-1535-57537-1554965648551-0:1}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}

总结Advisory的使用方式:
  1. 要在配置文件里面开启Advisories.
  2. 消息发送端没什么变化,不做多余改变或配置,
  3. 消息接收端:
    1)根据你要接收的消息类型,来设置不同的topic,可以直接从界面查询主题的name之后订阅,也可以借助AdvisorySupport类来获取
    2)由于这个topic默认不是持久化的,所有要先看起接收端,然后再发送消息。

    3) 接收消息的时候,接收到的消息类型是ActiveMQMessage,所以需要先转换成ActiveMQMessage再获取消息(也就是这是ActiveMQ特有的消息类型)

 3.延迟和定时消息传递(Delay and schedule Message Delivery)

   ActimeMQ也可以实现延时或者定时投递消息,类似于quartz定时任务等。

一共四个属性
AMQ_SCHEDULED_DELAY: 延迟投递的时间
AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT:重复投递次数
AMQ_SCHEDULED_CRON: Cron表达式

ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置。

(1)首先在broker上设置schedulerSupport="true"

(2)程序上设置延迟以及定时效果,如下设置延迟30秒,重发3次,重发间隔是5秒的效果

            TextMessage tms = session.createTextMessage("textMessage:" + i);
            long time = 30 * 1000;
            tms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            tms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3);
            tms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5 * 1000);
            // 9.生产者发送消息
            producer.send(tms);

消费者结果:

  消费者不会马上接收到消息,而是在30秒后第一次接受消息,并且间隔5秒后再次接受消息,总共会接收4次。

(3)使用CRON表达式

            // 8.创建Message,有好多类型,这里用最简单的TextMessage
            TextMessage tms = session.createTextMessage("textMessage:" + i);
            tms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"* * * * *");
            // 9.生产者发送消息
            producer.send(tms);

  CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果,例如每小时都会发生消息被投递10次,延迟0秒开始,每次间隔1秒。

4.Blob Message(传输大文件,一般大于100m)

  也就是发送文件消息。ActiveMQ传输文件的方式有byteMessage、StreamMessage、BlobMessage。其中bytemessage和streammessage如果不加工处理的话,只能传输小文件,小于100M的文件应该都可以传,blobmessage可以传输较大的文件。对于比较小的文件,简单的处理方式是先读取所有的文件成byte[],然后使用ByteMessage,把文件数据发送到broker,像正常的message一样处理。对于大文件,例如1GB以上的文件,这么搞直接把client或是broker给oom掉了。有些时候,我们需要传递Blob(Binary Large Objects)消息,在5.14之前,(5.12和5.13需要在jetty.xml中手动开启)可以按照如下的方式配置使用fileserver:

  配置BLOB Tansfer Policy,可以在发送方的连接URI上设置,如:

tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8161/fileserver

  在5.14之后,就只能通过使用ftp协议来发送blobmessage,或自己将文件传到某个服务器上(通过FTP或其他方式),而后将该文件的url放在BlobMessage中再发送这条BlobMessage。不过,5.15好像又提供了http方式,不过需要自己实现文件上传服务器。

  由于我使用的版本你是5.15的,所以我需要先搭建服务器。

1.使用内嵌netty搭建文件服务器 (重要)

  也就是对文件进行管理,下面我的例子是使用jetty对G:/files/文件夹进行管理,会在浏览器检测此文件夹下面的文件,浏览器能解析的可以通过浏览器访问,不能解析(例如doc文件)的查看会下载。

  文件处理的类参考的下面三个类:   http://svn.apache.org/repos/asf/activemq/trunk/activemq-fileserver/  中三个类的的实现方式。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.qlq</groupId>
    <artifactId>FileServer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>

    <dependencies>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>2.5</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jsp-api</artifactId>
            <version>2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty.aggregate</groupId>
            <artifactId>jetty-all-server</artifactId>
            <version>7.6.4.v20120524</version>
        </dependency>
        <!-- slf4j 依赖包 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.0-rc1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.0-rc1</version>
        </dependency>
        <!-- 文件上传的测试包httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.3.1</version>
        </dependency>
    </dependencies>

    <build>
        <!-- 配置了很多插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

服务器启动类:

import org.eclipse.jetty.server.DispatcherType;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;

public class StartServer {
    public static void main(String[] args) throws Exception {
        Server server = new Server(8080);

        ServletContextHandler handler = new ServletContextHandler();
        // 相当于设置项目名称
        handler.setContextPath("/fileserver");
        // 设置资源文件所在目录,工具类中会以这个目录作为文件服务目录存储文件
        handler.setResourceBase("G:/files/");
        // handler.setResourceBase(".");
        System.out.println(handler.getServletContext().getRealPath("/"));

        handler.addFilter(FilenameGuardFilter.class, "/*", DispatcherType.FORWARD.ordinal());

        handler.addFilter(RestFilter.class, "/*", DispatcherType.FORWARD.ordinal());
        ServletHolder defaultServlet = new ServletHolder();
        defaultServlet.setName("DefaultServlet");
        defaultServlet.setClassName("org.eclipse.jetty.servlet.DefaultServlet");

        handler.addServlet(defaultServlet, "/*");

        server.setHandler(handler);
        server.start();
    }
}

重要的文件处理在下面的过滤器中:(PUT类型上传文件,请求method必须是PUT,GET请求资源,DELETE是删除文件,且请求类似于Restful风格,path最后一部分是文件的名称)

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Enumeration;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.UnavailableException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestFilter implements Filter {
    private static final Logger LOG = LoggerFactory.getLogger(RestFilter.class);

    private static final String HTTP_HEADER_DESTINATION = "Destination";
    private static final String HTTP_METHOD_MOVE = "MOVE";
    private static final String HTTP_METHOD_PUT = "PUT";
    private static final String HTTP_METHOD_GET = "GET";
    private static final String HTTP_METHOD_DELETE = "DELETE";

    private String readPermissionRole;
    private String writePermissionRole;
    private FilterConfig filterConfig;

    public void init(FilterConfig filterConfig) throws UnavailableException {
        this.filterConfig = filterConfig;
        readPermissionRole = filterConfig.getInitParameter("read-permission-role");
        writePermissionRole = filterConfig.getInitParameter("write-permission-role");
    }

    private File locateFile(HttpServletRequest request) {
        return new File(filterConfig.getServletContext().getRealPath(request.getServletPath()), request.getPathInfo());
    }

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        if (!(request instanceof HttpServletRequest && response instanceof HttpServletResponse)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("request not HTTP, can not understand: " + request.toString());
            }
            chain.doFilter(request, response);
            return;
        }

        HttpServletRequest httpRequest = (HttpServletRequest) request;
        HttpServletResponse httpResponse = (HttpServletResponse) response;

        System.out.println(httpRequest.getRequestURL());
        System.out.println(httpRequest.getMethod());
        
        if (httpRequest.getMethod().equals(HTTP_METHOD_MOVE)) {
            doMove(httpRequest, httpResponse);
        } else if (httpRequest.getMethod().equals(HTTP_METHOD_PUT)) {
            doPut(httpRequest, httpResponse);
        } else if (httpRequest.getMethod().equals(HTTP_METHOD_GET)) {
            if (checkGet(httpRequest, httpResponse)) {
                chain.doFilter(httpRequest, httpResponse); // actual processing
                                                            // done elsewhere
            }
        } else if (httpRequest.getMethod().equals(HTTP_METHOD_DELETE)) {
            doDelete(httpRequest, httpResponse);
        } else {
            chain.doFilter(httpRequest, httpResponse);
        }
    }

    protected void doMove(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("RESTful file access: MOVE request for " + request.getRequestURI());
        }

        if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
            response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
            return;
        }

        File file = locateFile(request);
        String destination = request.getHeader(HTTP_HEADER_DESTINATION);

        if (destination == null) {
            response.sendError(HttpURLConnection.HTTP_BAD_REQUEST, "Destination header not found");
            return;
        }

        try {
            URL destinationUrl = new URL(destination);
            IOHelper.copyFile(file, new File(destinationUrl.getFile()));
            IOHelper.deleteFile(file);
        } catch (IOException e) {
            response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
                                                                        // could
                                                                        // not
                                                                        // be
                                                                        // moved
            return;
        }

        response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return no
                                                                // content
    }

    protected boolean checkGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("RESTful file access: GET request for " + request.getRequestURI());
        }

        if (readPermissionRole != null && !request.isUserInRole(readPermissionRole)) {
            response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
            return false;
        } else {
            return true;
        }
    }

    protected void doPut(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("RESTful file access: PUT request for " + request.getRequestURI());
        }

        if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
            response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
            return;
        }

        File file = locateFile(request);

        if (file.exists()) {
            boolean success = file.delete(); // replace file if it exists
            if (!success) {
                response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // file
                                                                            // existed
                                                                            // and
                                                                            // could
                                                                            // not
                                                                            // be
                                                                            // deleted
                return;
            }
        }

        FileOutputStream out = new FileOutputStream(file);
        try {
            IOHelper.copyInputStream(request.getInputStream(), out);
        } catch (IOException e) {
            LOG.warn("Exception occured", e);
            throw e;
        } finally {
            out.close();
        }

        response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return no
                                                                // content
    }

    protected void doDelete(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("RESTful file access: DELETE request for " + request.getRequestURI());
        }

        if (writePermissionRole != null && !request.isUserInRole(writePermissionRole)) {
            response.sendError(HttpURLConnection.HTTP_FORBIDDEN);
            return;
        }

        File file = locateFile(request);

        if (!file.exists()) {
            response.sendError(HttpURLConnection.HTTP_NOT_FOUND); // file not
                                                                    // found
            return;
        }

        boolean success = IOHelper.deleteFile(file); // actual delete operation

        if (success) {
            response.setStatus(HttpURLConnection.HTTP_NO_CONTENT); // we return
                                                                    // no
                                                                    // content
        } else {
            response.sendError(HttpURLConnection.HTTP_INTERNAL_ERROR); // could
                                                                        // not
                                                                        // be
                                                                        // deleted
                                                                        // due
                                                                        // to
                                                                        // internal
                                                                        // error
        }
    }

    public void destroy() {
        // nothing to destroy
    }
}

过滤器中处理servlet文件上传,参考:https://www.cnblogs.com/qlqwjy/p/8722267.html

Httpclient上传文件代码如下:(PUT请求上传文件)

package upload;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

/**
 * httpclient上传文件(测试没问题)
 * 
 * @author Administrator
 *
 */
public class HttpClientUploadFile {
    public static void main(String[] args) throws ClassNotFoundException, ClientProtocolException, IOException {
        CloseableHttpClient httpclient = HttpClientBuilder.create().build();
        CloseableHttpResponse response = null;
        try {
            HttpPut httpput = new HttpPut(
                    "http://localhost:8080/fileserver/ID:MicroWin10-1535-54829-1554981858740-1:1:1:1:1");
            // 可以选择文件,也可以选择附加的参数
            HttpEntity req = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
                    .addPart("file", new FileBody(new File("G:/Exam.log")))// 上传文件,如果不需要上传文件注掉此行
                    .build();
            httpput.setEntity(req);

            System.out.println("executing request: " + httpput.getRequestLine());
            response = httpclient.execute(httpput);

            HttpEntity re = response.getEntity();
            System.out.println(response.getStatusLine());
            if (re != null) {
                System.out.println(
                        "Response content: " + new BufferedReader(new InputStreamReader(re.getContent())).readLine());
            }
            EntityUtils.consume(re);
        } finally {
            try {
                response.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

效果如下:

git地址:https://github.com/qiao-zhi/JettyFileServer.git

内嵌Jetty的用法参考:http://wiki.eclipse.org/Jetty/Tutorial/Embedding_Jetty

2.生产者和消费者

 生产者

package cn.qlq.activemq.blob;

import java.io.File;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class Producer {

    public static void main(String[] args) throws JMSException {
        // 创建链接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                "tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=http://localhost:8080/fileserver/");
        Connection connection = null;
        ActiveMQSession session = null;
        try {
            // 创建链接
            connection = factory.createConnection();
            // 启动链接
            connection.start();
            // 获取会话
            session = (ActiveMQSession) connection.createSession(Boolean.TRUE, session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Destination queue = session.createQueue("blobQueue");
            // 创建生产者对象
            MessageProducer messageProducer = session.createProducer(queue);
            // 创建blob消息
            BlobMessage blobMessage = session.createBlobMessage(new File("pom.xml"));
            messageProducer.send(blobMessage);
            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }

    }
}

消费者代码:

package cn.qlq.activemq.blob;

import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;

public class Consumer {
    /**
     * @param args
     * @throws JMSException
     */
    public static void main(String[] args) throws JMSException {

        // 获取 ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        // 创建 Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建 Destinatione
        Destination destination = session.createQueue("blobQueue");

        // 创建 Consumer
        MessageConsumer consumer = session.createConsumer(destination);

        //监听消息
        consumer.setMessageListener(new MessageListener() {
           @Override
           public void onMessage(Message message) {
               if (message instanceof BlobMessage) {
                   //监听BlobMessage
                   BlobMessage blobMessage = (BlobMessage) message;
                   try {
                       InputStream in = blobMessage.getInputStream();
                       byte[] bytes = new byte[in.available()];
                       in.read(bytes);
                       System.out.println(new String(bytes));
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }
           }
       });
    }

}

4. Message Transformation 消息类型转换

  有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,也就是对消息的类型进行转换,可以在如下对象上调用:
    ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.
  在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。

例如:下面的一个在生产的时候转换的例子

生产者:

package cn.qlq.activemq.topic;

import java.util.concurrent.CountDownLatch;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.MessageTransformer;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.leveldb.replicated.dto.Transfer;

/**
 * 主题模式的消息生产者
 * 
 * @author QiaoLiQiang
 * @time 2018年9月19日下午10:10:36
 */
public class MsgProducer {

    // 默认端口61616
    private static final String url = "tcp://localhost:61616/";
    private static final String topicName = "transTopic";

    public static void main(String[] args) throws JMSException, InterruptedException {
        // 1创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory创建connection
        Connection connection = connectionFactory.createConnection();

        // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
        Destination destination = session.createTopic(topicName);
        // 5.创建生产者producer
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
        // 6设置为持久模式(这个必须在下面开启connection之前)
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 7.启动connection
        connection.start();

        // 8.设置消息转换类型
        producer.setTransformer(new MessageTransformer() {
            // 生产者实现这个方法
            @Override
            public Message producerTransform(Session session, MessageProducer producer, Message message)
                    throws JMSException {
                if (message instanceof TextMessage) {
                    MapMessage mapMessage = session.createMapMessage();
                    mapMessage.setString("key", ((TextMessage) message).getText());

                    return mapMessage;
                }
                return message;
            }

            // 消费者换实现这个方法
            @Override
            public Message consumerTransform(Session session, MessageConsumer consumer, Message message)
                    throws JMSException {
                return null;
            }
        });

        for (int i = 0; i < 3; i++) {
            // 9.创建Message,有好多类型,这里用最简单的TextMessage
            TextMessage tms = session.createTextMessage("textMessage:" + i);
            // 10.生产者发送消息
            producer.send(tms);

            System.out.println("send:" + tms.getText());
        }
        // 11.关闭connection
        connection.close();

    }

}

消费者:

package cn.qlq.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 主题模式的消费消息
 * 
 * @author QiaoLiQiang
 * @time 2018年9月18日下午11:26:41
 */
public class MsgConsumer {

    // 默认端口61616
    private static final String url = "tcp://localhost:61616/";
    private static final String topicName = "transTopic";

    public static void main(String[] args) throws JMSException {
        // 1创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory创建connection
        Connection connection = connectionFactory.createConnection();
        // 设置链接的ID
        // 3.创建Session===第一个参数是是否事务管理,第二个参数是应答模式
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // 4.创建Destination(Queue继承Queue,Topic也继承Destination==这三个都是接口)
        Topic destination = session.createTopic(topicName);
        // 5.启动connection
        connection.start();
        // 6.创建消费者consumer
        MessageConsumer consumer = session.createConsumer(destination);
        // 监听消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof MapMessage) {
                    try {
                        MapMessage message2 = (MapMessage) message;
                        System.out.println(message2.getString("key"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println(message.getClass());
                }
            }
        });
/*        session.close();
        connection.close();*/
    }

}

结果:

WARN | path isn't a valid local location for TcpTransport to use
textMessage:0
textMessage:1
textMessage:2

原文地址:https://www.cnblogs.com/qlqwjy/p/10690285.html