通过Netty通信,采集设备现场GPS数据,并存放在redis服务器。

主程序代码如下:

/*
 /*
 * CopyRight (c) 2013 北京软秀科技有限公司www.inforwms.com 保留所有权利。
* mail:meslog@qq.com */ */ package com.softshow.product.digi; import java.io.IOException; import java.io.InputStream; import java.sql.SQLException; import java.text.DateFormat; import java.text.FieldPosition; import java.text.SimpleDateFormat; import java.util.Date; import java.util.InvalidPropertiesFormatException; import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.ConsoleHandler; import java.util.logging.FileHandler; import java.util.logging.Formatter; import java.util.logging.LogRecord; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; import org.jboss.netty.util.internal.ConcurrentHashMap; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.softshow.product.cache.DataCache; import com.softshow.product.db.DatabaseDataManager; import com.softshow.product.handler.IdentifyChannelDecoder; import com.softshow.product.handler.InitInfoChannelDecoder; import com.softshow.product.helper.DateUtils; import com.softshow.product.helper.GlobalTimer; import com.softshow.product.helper.HttpClientHelper; import com.softshow.product.helper.Log; import com.softshow.product.model.CarInfo; import com.softshow.product.timer.AutoUpgradTimer; import com.softshow.product.timer.AutoOrderTimer; import com.softshow.product.timer.WSAutoOrderTimer; /** * 服务统一管理 * @author <a href="mailto:meslog@qq.com">meslog</a> * @version 1.0.0.2013-10-22 * */ public class ServerManager { private final String cfgPath = "/server.cfg"; private final List<TrackerServer> serverList = new LinkedList<TrackerServer>(); private final static ConcurrentMap<Integer,TrackerServer> trackerServerMap = new ConcurrentHashMap<Integer,TrackerServer>();//lizhao public static ConcurrentMap<Integer,TrackerServer> getTrackerServerMap(){//lizhao return trackerServerMap; } public void addTrackerServer(TrackerServer trackerServer) { serverList.add(trackerServer); } private boolean loggerEnabled; public boolean isLoggerEnabled() { return loggerEnabled; } private static DataManager dataManager; public static DataManager getDataManager() { return dataManager; } private static Properties properties; public Properties getProperties() { return properties; } private static JedisConnectionFactory redisFactory; public static JedisConnectionFactory getRedisFactory() { return redisFactory; } /** * 初始化日志、缓存、数据库和Netty服务 */ public void init() throws IOException, ClassNotFoundException, SQLException { loadProperties();//加载配置,初始化常量 initLogger(properties);//初始化日志 dataManager = new DatabaseDataManager(properties);//初始化数据库连接 Log.info(Constants.SERVER_MAIN, "**database connection initialized......**"); redisFactory = initRedis(properties);//初始化redis缓存 initDataCache();//加载redis缓存 Log.info(Constants.SERVER_MAIN, "**redis load compeleted......**"); initELSServer("fourfaith");//初始化Netty服务 } private void loadProperties() throws InvalidPropertiesFormatException, IOException{ //加载配置文件 properties = new Properties(); InputStream in = getClass().getResourceAsStream(cfgPath); properties.loadFromXML(in); in.close(); //打印版本信息 Constants.VERSION_CODE = properties.getProperty("connection.version"); Log.info(Constants.SERVER_MAIN, "===============================system version:["+Constants.VERSION_CODE+"]"); //初始化常量 initConstant(properties); } /** * 每天定时清理缓存数据 */ private void scheduleDailyGpsCache() { final RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); Timer timer = new java.util.Timer(true); final List<CarInfo> carList = DataCache.getAllCar(); TimerTask task = new TimerTask() { public void run() { for (CarInfo car : carList) { redisTemplate.delete(Constants.CACHE.TODAY_GPS_COLLECTION_PREFIX + "." + car.getId()); } } }; timer.scheduleAtFixedRate(task, DateUtils.getTomorrowTimeByHour(0), 24 * 60 * 60 * 1000); } private JedisConnectionFactory initRedis(Properties prop) { redisFactory = new JedisConnectionFactory(); redisFactory.setUsePool(true); redisFactory.setPort(Integer.parseInt(prop.getProperty("redis.port"))); redisFactory.setHostName(prop.getProperty("redis.serverUrl")); redisFactory.setDatabase(Integer.parseInt(prop.getProperty("redis.database"))); redisFactory.afterPropertiesSet(); return redisFactory; } /** * 启动Netty服务和相关定时线程 */ public void start() { //启动Netty Server for (Object server : serverList) { ((TrackerServer) server).start(); } //启动定时器 startTimer(); } /** * Stop */ public void stop() { for (Object server : serverList) { ((TrackerServer) server).stop(); } // Release resources GlobalChannelFactory.release(); GlobalTimer.release(); try { redisFactory.destroy(); } catch (Exception e) { e.printStackTrace(); } } /** * Destroy */ public void destroy() { serverList.clear(); } /** * Initialize logger */ private void initLogger(Properties properties) throws IOException { loggerEnabled = Boolean.valueOf(properties.getProperty("logger.enable")); if (loggerEnabled) { String serverMainName = properties.getProperty("logger.servermain"); initLog(serverMainName,Constants.SERVER_MAIN); String serverGpsName = properties.getProperty("logger.servergps"); initLog(serverGpsName,Constants.GPS_LOGGER_SERVER); String serverHeartName = properties.getProperty("logger.serverheart"); initLog(serverHeartName,Constants.HEART_LOGGER_SERVER); } } /** * 初始化常量 * @param properties * @throws IOException */ private void initConstant(Properties props) throws IOException { String webPrefix = "http://" + props.getProperty("webIP"); String ctx = props.getProperty("webContext"); if(ctx!=null && ctx.trim().length()>0){ webPrefix += "/"+ctx; } Constants.query_Order_url = webPrefix + Constants.query_Order_url; Constants.setRange_url = webPrefix + Constants.setRange_url; Constants.getDeviceParameter_url = webPrefix + Constants.getDeviceParameter_url; HttpClientHelper.MapbarAPIAddr = props.getProperty("mapBarAPIAddress"); } private boolean isProtocolEnabled(Properties properties, String protocol) { String enabled = properties.getProperty(protocol + ".enable"); if (enabled != null) { return Boolean.valueOf(enabled); } return false; } private void initServer(String protocol) throws SQLException {//lizhao if (isProtocolEnabled(properties, protocol)) { TrackerServer trackerServer = new TrackerServer(this, new ServerBootstrap(), protocol) { @Override protected void addSpecificHandlers(ChannelPipeline pipeline) { //进行了编码和解码.之前的handler处理字节,之后的handler处理字符串。顺序不可乱。 pipeline.addLast("stringDecoder", new StringDecoder()); pipeline.addLast("stringEncoder", new StringEncoder()); pipeline.addLast("identifyDeviceDecoder", new IdentifyChannelDecoder(ServerManager.this)); pipeline.addLast("initInfoChannelDecoder", new InitInfoChannelDecoder(ServerManager.this)); } }; trackerServerMap.put(trackerServer.getPort(), trackerServer); serverList.add(trackerServer); } } /** * * Description:启动时初始化数据缓存 */ private void initDataCache() { new DataCache(dataManager); } /** * 初始化日志 * @param name * @param sgin * @throws IOException */ private void initLog(String name,String sgin)throws IOException{ if (name != null) { FileHandler file = new FileHandler(name,500 * 1024 * 1024, 10, true); // Simple formatter file.setFormatter(new Formatter() { private final String LINE_SEPARATOR = System.getProperty("line.separator", " "); private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public String format(LogRecord record) { StringBuffer line = new StringBuffer(); dateFormat.format(new Date(record.getMillis()), line, new FieldPosition(0)); line.append(" "); line.append(record.getSourceClassName()); line.append("."); line.append(record.getSourceMethodName()); line.append(" "); line.append(record.getLevel().getName()); line.append(": "); line.append(formatMessage(record)); line.append(LINE_SEPARATOR); return line.toString(); } }); Log.getLogger(sgin).addHandler(file);//文件输出 Log.getLogger(sgin).addHandler(new ConsoleHandler());//控制台输出 } } /** * 启动定时任务 * @param dataManager */ public void startTimer(){ // webservice定时获取订单 ScheduledExecutorService executorWSOrder = Executors.newScheduledThreadPool(1); executorWSOrder.scheduleWithFixedDelay(new WSAutoRecieveOrderTimer(dataManager), 10, 300, TimeUnit.SECONDS); Log.info(Constants.SERVER_MAIN, "**Timer[receive order] running per 300s......**"); // 定时订单 ScheduledExecutorService deliverOrder = Executors.newScheduledThreadPool(2); deliverOrder.scheduleWithFixedDelay(new AutoSendOrderTimer(dataManager), 10, 180, TimeUnit.SECONDS); Log.info(Constants.SERVER_MAIN, "**Timer[auto send order] running per 180s......**"); // GPS缓存定时清理 scheduleDailyGpsCache(); Log.info(Constants.SERVER_MAIN, "**Timer[GPS data cleaner] running per day......**"); // 定时检查升级是否超时 ScheduledExecutorService deviceUpgrade = Executors.newScheduledThreadPool(3); deviceUpgrade.scheduleWithFixedDelay(new AutoCheckUpgradTimer(dataManager), 20, 60, TimeUnit.SECONDS); Log.info(Constants.SERVER_MAIN, "**Timer[auto check upgrade device] running per 60s......**"); } }
原文地址:https://www.cnblogs.com/meslog/p/5180510.html