多线程数据查询统计

最近在做一个需求,就是根据选定的对账单查询该对账单下的所有交易明细,但是由于交易明细表过大——几百万——且没有做分表等操作,数据库用的是mysql,实时去查询的话,会比较慢。

虽然在交易明细表中给对账单id加了索引,但是由于查询的字段比较多且无需,肯定需要回表操作,另外还有对数据进行排序,结果造成每次导出一个对账单明细需要花费一两分钟的时间。

鉴于此,考虑使用多线程来解决该问题,因为交易明细表中存在交易时间字段,因此考虑通过交易时间进行拆分,并根据拆分出的时间段参数用多个线程进行查询,最后统计。

实施方案:

1、在交易时间上创建索引,或者跟对账单id一起创建联合索引

2、根据对账单下交易的最大和最小时间,并根据当前对账单的交易笔数进行合理拆分

 交易(时间)拆分主要集中在以下几个方面

 1> 交易的笔数

正常对账单以月为单位,即时间范围为一个月,以对账单交易笔数10W为例,如果每天的交易偏移量不大,即每天的交易笔数都在3000左右,这样就可以以天为单位进行拆分

非正常情况下,月对账单可能包含不属于当前月份的交易,且当前月份交易站90%以上,即交易偏移量很大,这样就可能需要细分:

  根据每天的交易量进行分组排序,按照尽可能平均的交易量进行日期拆分

  直接按照月份进行拆分,因为99%的情况下是一个月的交易稳定,而其他月份加到一起也不到一个查询时间周期的量,这时把所有不属于当前月份的拆成一次查询,当月再进行按天拆分

 2> 数据库服务器配置

主要是考虑数据库服务器性能,正常来说,肯定是线程越多查询效率越高,即木桶原理,总的时间取决于查询时间最长的一个线程,但对于数据库来讲,线程越多对数据库的配置要求就越高,如果数据库硬件配置不足,而同时执行的线程又太多,会导致线程阻塞(多线程效率降低),或者直接造成数据库死锁。

 3>应用服务器性能

因为数据统计出来之后,可能会存在对数据进行二次遍历重组的情况,如过滤一些不需要的数据,对一些特殊数据进行单独处理,另外本身一次性查询10W数据,即需要服务器内存处理10W的数据量,对服务器本身的压力也挺大,如果由于数据量过大造成应用服务器内存溢出,那就只能分段处理或者增加服务器内存了

技术选择

一般的多线程场景,前台发起请求(如下单)需要及时作出响应,但后台可能还需要各种其他操作,而这些操作可能会占用大量的时间,用户不可能一直等待这些后续处理都完成才收到反馈,这种情况一般都会在主线程先给用户(前台)返回一个成功操作的标识,然后后台再启一个或多个线程(Thread)去处理后续业务,用户提交成功后可以刷新查询当前提交业务的处理进度;或者是通过消息队列的方式,主线程返回前台成功标识并把当前订单信息放入消息队列,相应的消费线程读取队列中的订单并进行后续的操作。

上述场景为用户不需要等待最终处理,且后续处理非常耗时间。但像统计这种需要快速的返回想要的结果,因此用到了另一种多线程——Callable

Callable 

Callable和Future出现的原因

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。 
这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 
如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable和Future介绍

Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。 
java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

代码实现,我这里最大限制一个月只能查3次,就是因为dba不允许有超过4个线程同时进行查询,说数据库性能不行,压力会很大

先查询对账单交易量,如果超过某个设定值(我这里设定1W)就开始拆分,后面如果有多个月份的,遍历去累加总的交易笔数,当超过设定值时,拆分出一个查询,重新做累加。

public List<StmtPolicyTransData> listPolicyTransDataByStatement(String statementId) {
        // 根据对账单拆分日期
        int record = 10000;
        // 1、 查询总记录数
        int totalCount = stmtService.countTransactionByStatement(statementId);
        logger.debug("对账单{}总交易记录数:{}", statementId, totalCount);
        if (totalCount < record) {
            // 不足1w,直接查询
            return stmtService.listPolicyTransDataByStatement(statementId, null, null);
        }
        // 2、查询每月记录数
        List<StmtMonthTransCount> monthCountList = stmtService.countStmtTransactionByMonth(statementId);
        // 3、统计查询参数  判断交易月份
        List<StmtTransDataParams> paramsList = Lists.newArrayList();
        if (monthCountList.size() == 1) {
            paramsList.addAll(splitTransactionTime(monthCountList.get(0)));
        } else {
            Date startTime = monthCountList.get(0).getMinTime();
            // 累加计数
            int sumCount = 0;
            for (StmtMonthTransCount data : monthCountList) {
                // 先判断之前是否有未计算的月份
                if (sumCount != 0) {
                    sumCount += data.getCount();
                    if (sumCount > record) {
                        StmtTransDataParams params = new StmtTransDataParams();
                        params.setStartTime(startTime);
                        params.setEndTime(data.getMinTime());
                        paramsList.add(params);
                        // 重置查询时间
                        startTime = data.getMinTime();
                        sumCount = data.getCount();
                    }
                    if (data.getCount() > record) {
                        paramsList.addAll(splitTransactionTime(data));
                        startTime = null;
                        sumCount = 0;
                    } else if (monthCountList.indexOf(data) == monthCountList.size() - 1) {
                        // 最后一个月份数据
                        StmtTransDataParams params = new StmtTransDataParams();
                        params.setStartTime(startTime);
                        params.setEndTime(DateUtils.addMinutes(data.getMaxTime(), 1));
                        paramsList.add(params);
                    }
                } else {
                    if (data.getCount() > record) {
                        paramsList.addAll(splitTransactionTime(data));
                    } else {
                        // 新计算月份,重置开始时间
                        startTime = data.getMinTime();
                        sumCount = data.getCount();
                        if (monthCountList.indexOf(data) == monthCountList.size() - 1) {
                            // 最后一个月份数据
                            StmtTransDataParams params = new StmtTransDataParams();
                            params.setStartTime(data.getMinTime());
                            params.setEndTime(DateUtils.addMinutes(data.getMaxTime(), 1));
                            paramsList.add(params);
                        }
                    }
                }
            }
        }

        // 遍历参数  创建线程查询
//        ExecutorService executorService = Executors.newFixedThreadPool(paramsList.size());

        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(paramsList.size(),
                new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
        List<Callable<List<StmtPolicyTransData>>> callableList = Lists.newArrayList();
        paramsList.forEach(params -> callableList.add(new StmtTransDataCall(stmtService, statementId, params.getStartTime(), params.getEndTime())));

        // 遍历统计交易数
        List<StmtPolicyTransData> dataList = Lists.newArrayList();

        try {
            List<Future<List<StmtPolicyTransData>>> futureList = executorService.invokeAll(callableList);
            if (!CollectionUtils.isEmpty(futureList)) {
                for (Future<List<StmtPolicyTransData>> future : futureList) {
                    dataList.addAll(future.get());
                }
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("多线程查询数据异常:" + e.getMessage());
        }
        // 关闭线程池
        executorService.shutdown();
        return dataList;
    }

    /**
     * 单月按照交易时间拆分
     *
     * @param stmtMonthTransCount
     * @return
     */
    private List<StmtTransDataParams> splitTransactionTime(StmtMonthTransCount stmtMonthTransCount) {
        List<StmtTransDataParams> paramsList = Lists.newArrayList();
        int record10 = 100000;
        int record6 = 60000;
        int record4 = 30000;
        int record2 = 15000;

        // 根据交易数拆分
        int count = stmtMonthTransCount.getCount();
        Date minTime = stmtMonthTransCount.getMinTime();
        Date maxTime = stmtMonthTransCount.getMaxTime();
        int days = 0;
        // 考虑数据库服务器性能问题,暂定最多开4个线程,另暂不考虑有其他月份情况
        if (count >= record10) {
            // 10w以上,3天查一次,最少查10次
            days = 11;
        } else if (count >= record6) {
            // 6w以上,5天查一次,最少查6次
            days = 11;
        } else if (count >= record4) {
            // 4w以上,查3次  10天查一次
            days = 11;
        } else if (count >= record2) {
            // 2w以上,16天查一次,最多查2次
            days = 16;
        } else {
            days = 31;
        }
        while (minTime.compareTo(maxTime) < 0) {
            Date endTime = DateUtils.addDays(minTime, days);
            if (endTime.compareTo(maxTime) > 0) {
                endTime = maxTime;
            }
            StmtTransDataParams params = new StmtTransDataParams();
            params.setStartTime(minTime);
            params.setEndTime(endTime);
            paramsList.add(params);
            minTime = endTime;
        }
        return paramsList;
    }
原文地址:https://www.cnblogs.com/flysand/p/10984894.html