多线程操作数据库 异常抛出全部回滚的问题

package czc.superzig.modular.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import com.google.common.collect.Lists;

import czc.superzig.common.operatingtable.base.entity.Result;
import czc.superzig.common.operatingtable.base.entity.Results;
import czc.superzig.modular.system.operatingtable.entity.DetectionIndicator;

/**
 *多线程操作数据库  其中一个线程发生异常则所有线程发生回滚 
 * 
 */

public abstract class ThreadUtil<T> {
    
    private DataSourceTransactionManager txManager;
    
    public abstract void run(T entity);
    
    public ThreadUtil(List<T> list,DataSourceTransactionManager txManager){
        this.txManager=txManager;
        createThread(list);
    }
    
    
    
    private Result createThread(List<T> list) {
        Result result = new Result();
       
        //每条线程最小处理任务数
        int perThreadHandleCount = 1;
        //线程池的最大线程数
        int nThreads = 10;
        //任务数
        int taskSize = list.size();

        if (taskSize > nThreads * perThreadHandleCount) {
            perThreadHandleCount = taskSize % nThreads == 0 ? taskSize / nThreads : taskSize / nThreads + 1;
            nThreads = taskSize % perThreadHandleCount == 0 ? taskSize / perThreadHandleCount : taskSize / perThreadHandleCount + 1;
        } else {
            nThreads = taskSize;
        }
        //主线程的同步计时器
        CountDownLatch mainLatch = new CountDownLatch(1);
        //监控子线程的同步计时器
        CountDownLatch threadLatch = new CountDownLatch(nThreads);
        //根据子线程执行结果判断是否需要回滚
        BlockingDeque<Boolean> resultList = new LinkedBlockingDeque<>(nThreads);
        //必须要使用对象,如果使用变量会造成线程之间不可共享变量值
        RollBack rollBack = new RollBack(false);
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);

        List<Future<List<Object>>> futures = Lists.newArrayList();
        //返回数据的列表
        List<Object> returnDataList = Lists.newArrayList();
        //给每个线程分配任务
        for (int i = 0; i < nThreads; i++) {
            int lastIndex = (i + 1) * perThreadHandleCount;
            List<T> listVos = list.subList(i * perThreadHandleCount, lastIndex >= taskSize ? taskSize : lastIndex);
            FunctionThread functionThread = new FunctionThread(mainLatch, threadLatch, rollBack, resultList, listVos);
            Future<List<Object>> future = fixedThreadPool.submit(functionThread);
            futures.add(future);
        }

        /** 存放子线程返回结果. */
        List<Boolean> backUpResult = Lists.newArrayList();
        try {
            //等待所有子线程执行完毕
            boolean await = threadLatch.await(20, TimeUnit.SECONDS);
            //如果超时,直接回滚
            if (!await) {
                rollBack.setRollBack(true);
            } else {
                //查看执行情况,如果有存在需要回滚的线程,则全部回滚
                for (int i = 0; i < nThreads; i++) {
                    Boolean flag = resultList.take();
                    backUpResult.add(flag);
                    if (flag) {
                        /** 有线程执行异常,需要回滚子线程. */
                        rollBack.setRollBack(true);
                    }
                }
            }
        } catch (InterruptedException e) {
            result.setSuccess(false);
            result.setMsg("等待所有子线程执行完毕时,出现异常,整体回滚");
        } finally {
            //子线程再次开始执行
            mainLatch.countDown();
            fixedThreadPool.shutdown();
        }

        /** 检查子线程是否有异常,有异常整体回滚. */
        for (int i = 0; i < nThreads; i++) {
            if (CollectionUtils.isNotEmpty(backUpResult)) {
                Boolean flag = backUpResult.get(i);
                if (flag) {
                    result.setSuccess(false);
                    result.setMsg("运行失败");
                }
            } else {
                result.setSuccess(false);
                result.setMsg("运行失败");
            }
        }

        //拼接结果
        try {
            for (Future<List<Object>> future : futures) {
                returnDataList.addAll(future.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
            result.setSuccess(false);
            result.setMsg("运行失败子线程正常创建参保人成功,主线程出现异常,回滚失败");
        }
        if(result.getSuccess()){
            result.setData(returnDataList);
        }
        return result;
    }

    public class FunctionThread implements Callable<List<Object>> {
        /**
         * 主线程监控
         */
        private CountDownLatch mainLatch;
        /**
         * 子线程监控
         */
        private CountDownLatch threadLatch;
        /**
         * 是否回滚
         */
        private RollBack rollBack;
        private BlockingDeque<Boolean> resultList;
        private List<T> taskList;

        public FunctionThread(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) {
            this.mainLatch = mainLatch;
            this.threadLatch = threadLatch;
            this.rollBack = rollBack;
            this.resultList = resultList;
            this.taskList = taskList;
        }

        @Override
        public List<Object> call() throws Exception {
            //为了保证事务不提交,此处只能调用一个有事务的方法,spring 中事务的颗粒度是方法,只有方法不退出,事务才不会提交
            return FunctionTask(mainLatch, threadLatch, rollBack, resultList, taskList);
        }

    }

    public class RollBack {
        private Boolean isRollBack;

        public Boolean getRollBack() {
            return isRollBack;
        }

        public void setRollBack(Boolean rollBack) {
            isRollBack = rollBack;
        }

        public RollBack(Boolean isRollBack) {
            this.isRollBack = isRollBack;
        }
    }

    public List<Object> FunctionTask(CountDownLatch mainLatch, CountDownLatch threadLatch, RollBack rollBack, BlockingDeque<Boolean> resultList, List<T> taskList) {
        //开启事务
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setName(java.util.UUID.randomUUID().toString());
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); // 事物隔离级别,开启新事务
        TransactionStatus status=txManager.getTransaction(def);
        
        List<Object> returnDataList = Lists.newArrayList();
        Boolean result = false;
        try {
            for (T entity : taskList) {
                //执行业务逻辑
                try {
                    run(entity);
                } catch (Exception e) {
                    result=true;
                }
                returnDataList.add(entity);
            }
            //Exception 和 Error 都需要抓
        } catch (Throwable throwable) {
            throwable.printStackTrace();
            result = true;
        }
        //队列中插入回滚的结果 并对计数器减1
        resultList.add(result);
        threadLatch.countDown();

        try {
            //等待主程序的计数器
            mainLatch.await();
        } catch (InterruptedException e) {
            System.err.println("批量操作线程InterruptedException异常");
        }

        if (rollBack.getRollBack()) {
             System.err.println("批量操作线程回滚");
             txManager.rollback(status);
        }else{
            txManager.commit(status);
        }
        return returnDataList;
    }

}

 1.

CountDownLatch 线程计数器 创建时指定计数的大小 和监控的线程数相同  是同步的
wait方法 等计数为0的时候才能继续执行下面代码 并可设置等待时间
countDown 计数减一
用于主线程和多个子线程之间的相互等待

2.阻塞队列 生产不足的时候 阻塞消费 消费不足的时候 阻塞生产 放置数据丢失的问题

原文地址:https://www.cnblogs.com/xiatc/p/12803212.html