十万级百万级数据量的Excel文件导入并写入数据库

一.需求分析

  最近接到一个需求,导入十万级,甚至可能百万数据量的记录了车辆黑名单的Excel文件,借此机会分析下编码过程;

  首先将这个需求拆解,发现有三个比较复杂的问题:

  问题一:Excel文件导入后首先要被解析为存放对象的列表,数据量大的情况下可能会导致内存溢出,解析时间过长;

  问题二:插入数据库的时候,数据量大,写入的时间长

  问题三:要对数据库中的现有数据进项判断,不仅仅要做插入动作,还要将数据库的数据与导入的数据对比,判断是否做更新操作

  

  其中:

  问题一和问题三,可以看做同一类,因为主要涉及内存计算导致的性能问题,以及内存占用过大的溢出问题,

  关于这两个问题,现在线上的机器基本上是4核8G的配置集群部署,内存并不是关键,我会在另一篇文章中给出我的方案,

  今天主要针对问题二,写入的数据库的问题给出我的方案,

  

  问题二主要是多次写入数据库的问题,显然,如果有几十万条数据,那么是不可能连续写几十万次的,不然要写到后年马月才能全部入库,

  解决方案:

    这里我主要采用了多线程的写入方式,十万条数据,2000条写一次(可以自己定义),用线程池提交多个线程任务同时写入,提高性能

二.代码环境

  Springboot2.1.3+POI+PGSQL

  controller层代码

    @PostMapping("/upload")
    public void upload1(MultipartFile file, @Validated UploadReq req) throws Exception {
        //从数据库查询出现有的数据,根据去重的字段分组去构建成一个HashMap,通过containsKey()判断
        //将需要更新的数据放到updateList中
        List<User> updateList=new ArrayList<>();

        //已取值的行数
        int rowNum = 0;
        //列号
        int colNum = 0;
        //真正有数据的行数
        int realRowCount = 0;
        //得到工作空间
        Workbook workbook = null;

        try {
            workbook = ExcelUtil.getWorkbookByInputStream(file.getInputStream(), file.getOriginalFilename());
        } catch (IOException e) {
            e.printStackTrace();
        }
        //得到工作表
        int numberOfSheets = workbook.getNumberOfSheets();
        for (int i = 0; i < numberOfSheets; i++) {
            Sheet sheet = ExcelUtil.getSheetByWorkbook(workbook, i)
            realRowCount = sheet.getPhysicalNumberOfRows();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            List<User> list = new ArrayList<>();
            User user = null;

            for(Row row:sheet) {
                if(realRowCount == rowNum) {
                    break;
                }
                //空行跳过
                if(ExcelUtil.isBlankRow(row)) {
                    continue;
                }
                if(row.getRowNum() == -1) {
                    continue;
                }else {
                    //第一行表头跳过
                    if(row.getRowNum() == 0) {
                        continue;
                    }
                }
                rowNum ++;
                colNum = 1;
                user = new User();
                ExcelUtil.validCellValue(sheet, row, colNum, "id");
                user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
                ExcelUtil.validCellValue(sheet, row, ++ colNum, "name");
                user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
                //判断是否是已存在的数据,如果是就更新,不是就新增
                //updateList.add(user);
                list.add(user);
                
            }
            
            //新增的逻辑
            userService.saveBatch(list);
            System.out.println(list);
        }
    }

  service层代码

@Service
public class UserServiceImpl implements IUserService {

    @Autowired
    private UserMapper userMapper;


    @Override
    public void saveBatch(List<User> list) throws Exception {
        //一个线程处理200条数据
        int count = 200;
        //数据集合大小
        int listSize = list.size();
        //开启的线程数
        int runSize = (listSize / count) + 1;
        //存放每个线程的执行数据
        List<User> newlist = null;

        //创建一个线程池,数量和开启线程的数量一样
        //Executors 的写法
        // ExecutorService executor = Executors.newFixedThreadPool(runSize);

        //ThreadPoolExecutor的写法
        ThreadPoolExecutor executor = new ThreadPoolExecutor(runSize, runSize, 1,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        //创建两个个计数器
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(runSize);
        //循环创建线程
        for (int i = 0; i < runSize; i++) {
            //计算每个线程执行的数据
            if ((i + 1) == runSize) {
                int startIndex = (i * count);
                int endIndex = list.size();
                newlist = list.subList(startIndex, endIndex);
            } else {
                int startIndex = (i * count);
                int endIndex = (i + 1) * count;
                newlist = list.subList(startIndex, endIndex);
            }
            //线程类
            ImportThread mythead = new ImportThread(newlist, begin, end,userMapper);
            //这里执行线程的方式是调用线程池里的executor.execute(mythead)方法。
            executor.execute(mythead);
        }
        begin.countDown();
        end.await();
        //执行完关闭线程池
        executor.shutdown();
    }

  线程类

public class ImportThread implements Runnable {


    public ImportThread() {
    }

    UserMapper userMapper;
    private List<User> list;
    private CountDownLatch begin;
    private CountDownLatch end;

    /**
     * 方法名: ImportThread
     * 方法描述: 创建个构造函数初始化 list,和其他用到的参数
     * @throws
     */
    public ImportThread(List<User> list, CountDownLatch begin, CountDownLatch end,UserMapper userMapper) {
        this.list = list;
        this.begin = begin;
        this.end = end;
        this.userMapper=userMapper;
    }

    @Override
    public void run() {
        try {
            //执行完让线程直接进入等待
            userMapper.saveBatch(list);
            begin.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //这里要主要了,当一个线程执行完 了计数要减一不然这个线程会被一直挂起
            //这个方法就是直接把计数器减一的
            end.countDown();
        }
    }

}

  

  

  

原文地址:https://www.cnblogs.com/july-sunny/p/11761482.html