JAVA使用多线程进行数据处理

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * 以下是伪代码,要根据自己的实际逻辑进行整合
 */
@Service
public class PushProcessServiceImpl implements PushProcessService {


    private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);

    /**
     *每个线程更新的条数
     * 这表示每次执行五千条数据的推送操作
     */
    private static final Integer LIMIT = 5000;

    /**
     * 起的线程数
     */
    private static final Integer THREAD_NUM = 5;

    /**
     * 创建线程池
     *
     * -  corePoolSize:线程核心参数选择了5
     *
     * - maximumPoolSize:最大线程数选择了核心线程数2倍数
     *
     * - keepAliveTime:非核心闲置线程存活时间直接置为0
     *
     * - unit:非核心线程保持存活的时间选择了 TimeUnit.SECONDS 秒
     *
     * - workQueue:线程池等待队列,使用 容量初始为100的 LinkedBlockingQueue阻塞队列
     *
     * 线程池拒绝策略,采用了默认AbortPolicy:直接丢弃任务,抛出异常。
     *
     */
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));


    /**
     * 执行推送任务
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void pushData() throws ExecutionException, InterruptedException {
        //计数器,需要保证线程安全
        int count = 0;

        //这里从数据库查询出要推送数据总数,根据自己实际的来
        Integer total = pushProcessMapper.getCountByState(0);


        logger.info("未推送数据条数:{}", total);
        //计算需要循环多少轮
        int num = total / (LIMIT * THREAD_NUM) + 1;
        logger.info("要经过的轮数:{}", num);

        //统计总共推送成功的数据条数
        int totalSuccessCount = 0;
        for (int i = 0; i < num; i++) {
            //使用集合来接收线程的运行结果,防止阻塞,接收线程返回结果
            List<Future<Integer>> futureList = new ArrayList<>(32);

            //起THREAD_NUM个线程并行查询更新库,加锁
            for (int j = 0; j < THREAD_NUM; j++) {
                //使用 synchronized 来保证线程安全,保证计数器的增加是有序的
                synchronized (PushProcessServiceImpl.class) {
                    int start = count * LIMIT;
                    count++;
                    /**
                     * 提交线程,用数据起始位置标识线程
                     * 这里前两个参数start和limit参数相当于执行sql
                     *  limit start,limit
                     *
                     */

                    Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
                    //先不取值,防止阻塞,放进集合
                    futureList.add(future);
                }
            }
            //统计本轮推送成功数据
            for (Future f : futureList) {
                totalSuccessCount = totalSuccessCount + (int) f.get();
            }
        }
        //把数据库的推送标识更新为已推送(已推送!=推送成功),可以根据自己实际的来
        pushProcessMapper.updateAllState(1);

        logger.info("推送数据完成,需推送数据:{},推送成功:{}", total, totalSuccessCount);
    }

    /**
     * 推送数据线程类
     */
    class PushDataTask implements Callable<Integer> {
        int start;
        int limit;
        int threadNo;   //线程编号

        PushDataTask(int start, int limit, int threadNo) {
            this.start = start;
            this.limit = limit;
            this.threadNo = threadNo;
        }

        @Override
        public Integer call() throws Exception {
            int count = 0;
            //分页查询每次执行的推送的数据,查询数据
            List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);
            if (CollectionUtils.isEmpty(pushProcessList)) {
                return count;
            }
            logger.info("线程{}开始推送数据", threadNo);

            /**
             * 遍历需要更新的数据实体类
             */
            for (PushProcess process : pushProcessList) {
                //这里是执行推送请求,根据自己实际的来,也可以要处理的任务
                boolean isSuccess = pushUtil.sendRecord(process);

                //根据主键更新推送是否成功状态标识
                if (isSuccess) {
                    //推送成功
                    pushProcessMapper.updateFlagById(process.getId(), 1);
                    count++;
                } else {
                    //推送失败
                    pushProcessMapper.updateFlagById(process.getId(), 2);
                }
            }
            logger.info("线程{}推送成功{}条", threadNo, count);
            return count;
        }
    }
}
-----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
原文地址:https://www.cnblogs.com/pxblog/p/14499149.html