Rabbit队列数据的传输与获取

public class Utils {
    public static Connection getRabbitConnection(String path) throws IOException {
        ConnectionFactory factory = new ConnectionFactory();
        Properties properties = new Properties();
        //设置RabbitMQ所在主机ip或者主机名
        properties.load(new FileInputStream(path + "\WEB-INF\classes\queue.properties"));
        String host = properties.getProperty("host");
        String username = properties.getProperty("username");
        String password = properties.getProperty("password");
        factory.setHost(host);
        factory.setUsername(username);
        factory.setPassword(password);
        //创建一个连接
        return factory.newConnection();
    }


    public static void sendMsg(String msg, String queueName, String exName, String luName,String path) throws IOException {
        /**
         * 创建连接连接到MabbitMQ
         */
        Connection connection = getRabbitConnection(path);
        //创建一个频道
        Channel channel = connection.createChannel();

        //声明一个队列 不持久化 不单独  不自动删除
        channel.queueDeclare(queueName, true, false, false, null);
        //创建一个EX1:交换器名称  direct:交换器类型  true:是否持久化
        channel.exchangeDeclare(exName, "direct", true);
        //创建绑定 queueName:队列名称 EX1:交换器 LU1:路由键
        channel.queueBind(queueName, exName, luName);
        //往队列中发出一条消息 EX1:交换器名称  LU1:路由键
        channel.basicPublish(exName, luName, MessageProperties.MINIMAL_PERSISTENT_BASIC, msg.getBytes());
        System.out.println("Sent '" + msg + "'");
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}
String path = request.getSession().getServletContext().getRealPath("");
Utils.sendMsg(json,queueName,exName,luName,path);

获取数据

/**
 * BaseQueueListener.java
 * Created at 2018-2-5
 * Created by swsm
 * Copyright (C) 2018 BROADTEXT SOFTWARE, All rights reserved.
 */
package com.broadtext.collect.receive;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import com.broadtext.collect.util.DomParseQueueConfig;
import com.broadtext.collect.util.QueueColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSON;
import com.broadtext.collect.process.exception.UnSupportedMsgException;
import com.broadtext.collect.process.service.CommonService;
import com.broadtext.collect.util.ExceptionHandle;
import com.broadtext.collect.util.StringUtils;
import com.broadtext.common.utils.DateUtil;
import com.rabbitmq.client.Channel;

/**
 * <p>ClassName: BaseQueueListener</p>
 * <p>Description: 实现队列消费者监听的基类</p>
 * <p>Author: swsm</p>
 * <p>Date: 2018-2-5</p>
 */
public abstract class BaseQueueListener implements ChannelAwareMessageListener {
    
    private String dateFormat;
    
    @Autowired
    private CommonService commonService;
    
    /**
     * 日志
     */
    private Logger logger = LoggerFactory.getLogger(BaseQueueListener.class);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = null;
        String id = null;
        try {
            //1. 获取信息 并插入历史信息表
            msg = new String(message.getBody(), "UTF-8");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            id = this.insertHistory(msg, channel);
            //2. 验证并处理信息
            @SuppressWarnings("unchecked")
            Map<String, Object> map = JSON.parseObject(msg, Map.class);
            if (checkNotNull(map, getNotNullCol())) {
                strToDateCol(map, getChangeStrToDateCol());
            } else {
                throw new UnSupportedMsgException(Arrays.toString(getNotNullCol()) + " 参数值不存在");
            }
            Map<String, Map<String, List<QueueColumn>>> collectNodeMap = DomParseQueueConfig.getSingleton(
                    this.getClass().getClassLoader().getResource("/").getPath() + "queue-config.xml").getCollectNodeArrayList();
            //3. 实际的处理信息
            this.handleMsg(map, collectNodeMap);
        } catch (Exception e) {
            String ex = ExceptionHandle.getExceptionSampleInfo(e);
            logger.error("处理" + this.getQueueName() + "信息" + msg + " 出现异常: " + ex);
            this.commonService.updateCollectHistory("1", id);
            this.commonService.insertCollectException(ex, id);
            e.printStackTrace();
        }
    }
    
    public abstract void handleMsg(Map<String, Object> map, Map<String, Map<String, List<QueueColumn>>> collectNodeMap);

    public void strToDateCol(Map<String, Object> map, String[] changeStrToDateCol) {
        //默认是yyyy-MM-dd HH:mm:ss
        String df = this.getDateFormat() == null ? "yyyy-MM-dd HH:mm:ss" : this.getDateFormat();
        for (String str : changeStrToDateCol) {
            map.put(str, DateUtil.getDate(String.valueOf(map.get(str)), df));
        }
    }

    /**
     * <p>Description: 验证不能为null</p>
     * @param map map对象
     * @param notNullCol 不能为null的字段
     * @return 都不为null true 否则返回 false
     */
    public boolean checkNotNull(Map<String, Object> map, String[] notNullCol) {
        for (String str : notNullCol) {
            if (map.get(str) == null) {
                logger.error(str + "不存在!" + JSON.toJSONString(map) + ",不能为null的字段:" + Arrays.toString(notNullCol));
                return false;
            }
        }
        return true;
    }

    /**
     * <p>Description: 获取信息 并插入历史信息表</p>
     * @param msg mq信息
     * @param channel 通道
     * @return 历史信息表id
     * @throws IOException 异常
     */
    public String insertHistory(String msg, Channel channel) throws IOException {
        String id = StringUtils.getUuid();
        this.commonService.insertHistory(msg, id, this.getQueueName());
        return id;
    }

    public abstract String getQueueName();

    public abstract String[] getNotNullCol();

    public abstract String[] getChangeStrToDateCol();

    public String getDateFormat() {
        return dateFormat;
    }

}
package com.broadtext.collect.receive;

import com.broadtext.collect.process.service.BmjService;
import com.broadtext.collect.process.variable.QueueName;
import com.broadtext.collect.util.QueueColumn;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

import java.util.List;
import java.util.Map;

public class BmjQueueListener extends BaseQueueListener {
    
    @Autowired
    @Qualifier("bmjService")
    private BmjService bmjService;
    
    @Override
    public void handleMsg(Map<String, Object> map, Map<String, Map<String, List<QueueColumn>>> collectNodeMap) {
        this.bmjService.insertOrUpdateCell(map, collectNodeMap);
    }
    
    @Override
    public String getQueueName() {
        return QueueName.BMJ_QUEUE;
    }

    @Override
    public String[] getNotNullCol() {
        return new String[]{"Time", "TestStartTime", "TestEndTime"};
    }

    @Override
    public String[] getChangeStrToDateCol() {
        return new String[]{"Time", "TestStartTime", "TestEndTime"};
    }

}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <rabbit:connection-factory id="connectionFactory"
                               host="127.0.0.1" port="5672" username="guest" password="guest"/>
    

    <!--  <rabbit:connection-factory id="connectionFactory"
                                host="192.168.129.203" port="5672" username="root" password="root" />
 -->

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 设备状态  队列声明  -->
    <rabbit:queue id="equipStatusQueue" durable="true" auto-delete="false" exclusive="false" name="equipStatusQueue"/>
    <bean id="equipStatusQueueListener" class="com.broadtext.collect.receive.EquipStatusQueueListener"/>
    <rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual">
        <rabbit:listener queues="equipStatusQueue" ref="equipStatusQueueListener"/>
    </rabbit:listener-container>
</beans>
原文地址:https://www.cnblogs.com/zhuwenxia/p/9719796.html