java大并发数据保存方案

做了几年.net,如今终于要做java了。

  • 需求:

    线下终端会定时上传gps位置到服务端,服务端收到数据保存到mysql数据库,当线下终端过多时,问题出现了,首当其冲的是数据库连接池经常会崩溃,单个tomcat到100并发就会抛出异常。

  • 解决思路:

    原来是收到一条数据就保存一条数据,现在改为将收到的数据暂存到一个数据池,当满100条数据时再用saveBatch一次性保存,这样终端上传100次其实只建立了一次数据库连接,减轻数据库压力。如果只是这样还有一个不足,就是当数据池的数据一直都不满100条时,永远都不会保存到数据库,所以再加一个守护线程,每2分钟检查一次,如果数据池有数据,就全部保存到数据库。

  • 该方案执行效率,如下图,共1000次请求,100次并发的情况

  • 具体实现:

1、数据池代码

public class GpsPool {
    private static final Logger log = Logger.getLogger(GpsPool.class.getName());
    public static Queue<ClientGPS> queGps=new LinkedList<ClientGPS>();//暂存数据,等数据超过100条或者超过时间后自动保存 
    private static Object lock=new Object();
    //private ExecutorService s=Executors.newSingleThreadExecutor(); 
    private  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 10, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());
    /**
     * 一次性最多取出100个数据
     * @return
     */
    public static List<ClientGPS> poll100(){
        List<ClientGPS> gpss=new ArrayList<ClientGPS>();
        synchronized (lock) { 
            int length=100;
            if(queGps.size()<100)
                length=queGps.size();
            for (int i = 0; i < length; i++) { 
                 
                gpss.add(queGps.poll()); 
            }
        }
        return gpss;
    }
    public  void saveGps(ClientGPS gps){
        if (null==gps||queGps.size()>2000) {//大于2000暂不处理
            return;
        }
        if (queGps.size()>100) {
            queGps.offer(gps);  
            threadPool.execute(new GpsSaveThread()); 
            System.out.println("add2pool");
        }else{
            queGps.offer(gps); 
        }
        
    }
}
View Code

2、数据保存线程

class GpsSaveThread implements Runnable{
    private static final Logger log = Logger.getLogger(GpsTask.class.getName());
    ClientGPSService gpsService;
    @Override
    public void run() {
        gpsService=ServiceLocator.getBean(ClientGPSService.class); 
        try {
            List<ClientGPS> gpss; 
            while (GpsPool.queGps.size() > 100) {
                gpss = GpsPool.poll100();
                if (gpss.size() > 0) {
                    try {
                        gpsService.saveBatch(gpss.toArray(), gpss.size());
                    } catch (Exception e) {
                        saveError(e);
                        try {//失败后再试一次
                            gpsService.saveBatch(gpss.toArray(), gpss.size());
                        } catch (Exception ex) { 
                        }
                    }
                }
                
            }
        } catch (Exception e) {
            saveError(e);
        }
        
    }
    
}
View Code

3、守护线程代码

public class GpsTask implements Runnable{

    private static final Logger log = Logger.getLogger(GpsTask.class.getName());
    ClientGPSService gpsService;
    @Override
    public void run() {
        gpsService=ServiceLocator.getBean(ClientGPSService.class);
        while (true) { 
            try {
                Thread.sleep(1000*60*2);//2分钟检查一次
                List<ClientGPS> gpss= GpsPool.poll100();
                GpsPool.queGps.clear();
                if(gpss.size()>0){
                     try {
                        gpsService.saveBatch(gpss.toArray(),gpss.size());
                    } catch (Exception e) {
                        saveError(e);
                    }
                } 
            } catch (InterruptedException e) { 
                saveError(e);
            }catch(Exception e){
                saveError(e);
            }
        }
        
    }
}
守护线程代码
  • 抛砖引玉

目前这个方案有点问题,就是并发过千时,cpu会达到100%,期待有更好的方案。

原文地址:https://www.cnblogs.com/liqiao/p/6256378.html