activemq应用总结(一)

1.mq发送消息准备

配置

        <spring:bean id="jmsFactory"
            class="org.apache.activemq.ActiveMQConnectionFactory">
            <spring:property name="brokerURL" value="${activemq.brokerURL}" />
            <spring:property name="userName" value="${activemq.username}" />
            <spring:property name="password" value="${activemq.password}" />
        </spring:bean>

        <spring:bean id="client" class="com.*.*.core.mq.Client" init-method="init" destroy-method="destroy">
            <spring:property name="jmsFactory">
                <spring:ref bean="jmsFactory" />
            </spring:property>
            <spring:property name="reqQueue">
                <spring:ref bean="reqQueneOne" />
            </spring:property>
            <spring:property name="resQueue">
                <spring:ref bean="resQueneOne" />
            </spring:property>
            <spring:property name="timeToLive" value="${activemq.timetolive}" />
            <spring:property name="hostName">
                <spring:ref bean="hostInfo.hostName"/>
            </spring:property>
        </spring:bean>

        <spring:bean id="service" class="com.*.*.core.mq.AMQService">
            <spring:property name="client">
                <spring:ref bean="client" />
            </spring:property>
            <spring:property name="readTime" value="${activemq.readtimeout}" />
        </spring:bean>

mq通过jmsFactory产生的connection

connection = jmsFactory.createConnection();

connection通过获取会话session

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

session产生producer.

producer = session.createProducer(reqQueue);

2.发送的消息体

ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();

byteMessage设置JMSCorrelationID,Content(byteSequence = new ByteSequence(mqMessageEntity.getStrRecBuf().getBytes(MQGlobals.MQ_CHARSET));),property(name,value).JMSReplyTo(resQueue).

    public String send(MQMessageEntity mqMessageEntity)
    {
        String sRecBuf = mqMessageEntity.getStrRecBuf();
        String correlationID = String.valueOf(Thread.currentThread().getId())+"_"+createRandomString();
        ClientAMQ clientAMQ = new ClientAMQ();
        DataCache.instance().putData(correlationID, clientAMQ);
        mqMessageEntity.setCorrelationID(correlationID);
        client.sendMQ(mqMessageEntity);
        log.info("发送:correlationID="+correlationID+";["+sRecBuf+"]");
        String returnBuf = clientAMQ.waitRes(readTime);
        DataCache.instance().removeData(correlationID);
        //打印接收报文
        log.info("返回:correlationID="+correlationID+";["+returnBuf+"]");
        if (null == returnBuf) {
            returnBuf = "";
        }
        return returnBuf;
    }

3.接收消息配置

        <spring:bean id="reqQueueOne"
            class="org.apache.activemq.command.ActiveMQQueue">
            <spring:constructor-arg value="${lianpay.reqQueueOne}"></spring:constructor-arg>
        </spring:bean>
        <spring:bean id="resQueueOne"
            class="org.apache.activemq.command.ActiveMQQueue">
            <spring:constructor-arg value="${lianpay.resQueueOne}"></spring:constructor-arg>
        </spring:bean>
        <!-- YT_RESP消息监听 -->
        <spring:bean id="msgListener"
            class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
            <spring:constructor-arg>
                <spring:bean
                    class="com.*.*.core.mq.MsgListener">
                </spring:bean>
            </spring:constructor-arg>
            <spring:property name="defaultListenerMethod" value="handleMessage" />
            <spring:property name="messageConverter" ref="messageConverter" />
        </spring:bean>
        <!-- 交易监听容器 -->
        <spring:bean id="YTMsgContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <spring:property name="concurrentConsumers" value="${activemq.consumers}" />
            <spring:property name="connectionFactory" ref="jmsFactory" />
            <spring:property name="destination" ref="resQueueOne" />
            <spring:property name="messageListener" ref="msgListener" />
            <spring:property name="messageSelector" value="${selector.resp}"/>
        </spring:bean>

4.接收消息

    public void handleMessage(ActiveMQBytesMessage activeMQBytesMessage) {
        String correlationID = activeMQBytesMessage.getCorrelationId();
        log.info("监听到消息:correlationID=" + correlationID + "];"+ activeMQBytesMessage.getDestination());
        String returnBuf = null;
        try {
            returnBuf = new String(activeMQBytesMessage.getContent().getData(),MQGlobals.MQ_CHARSET);
        } catch (UnsupportedEncodingException e) {
            log.error("字符转换异常," + e.getMessage());
        }
        // 判断内存中是否存在发送对象
        Object obj = DataCache.instance().getData(correlationID);
        if (null != obj) {
            ClientAMQ clientAmq = (ClientAMQ) obj;
            clientAmq.setReturnBuf(returnBuf);
            clientAmq.startNotify(correlationID);
        } else {
            log.info("correlationID=" + correlationID + ";[" + returnBuf + "]");
        }
    }

5,接受消息的消息体

public class ClientAMQ {
    private String returnBuf;//返回结果
    
    /**
     * 开始等待
     * @param waitTine
     * @return
     */
    public synchronized String waitRes(long waitTime)
    {
        if(null!=returnBuf)
            return returnBuf;
        try {
            wait(waitTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnBuf;
    }
    
    /**
     * 开始唤醒
     * @param correlationID
     */
    public synchronized void startNotify(String correlationID)
    {
        notify();
    }

    public String getReturnBuf() {
        return returnBuf;
    }

    public void setReturnBuf(String returnBuf) {
        this.returnBuf = returnBuf;
    }
}
原文地址:https://www.cnblogs.com/hzcxy/p/2982149.html