多线程Callable处理数据

1.数据拆分多线程Callable处理

1.定义一个等于CPU核心数的线程池
2.根据数据 List 处理每个线程可以分到的数量List
3.Callable 线程处理数据
4.Future 获取Callcable线程处理后的数据
5.把 Future 获取的数据重新 addAll 进 List
6.返回数据

代码如下:

public List<String> packageStrings(List<String> list) throws InterruptedException, ExecutionException{
        // 开始时间
        long start = System.currentTimeMillis();
        // 线程数量=CPU核心数
        int threadNum = Runtime.getRuntime().availableProcessors();
        // 每条线程处理的数据
        int count = list.size() / threadNum;
        if (0 != list.size() % count) {
            threadNum++;
        }
        // 创建一个大小等于CPU核心数的线程池
        ExecutorService exec = Executors.newFixedThreadPool(threadNum);
        try {
            // 定义一个任务集合
            List<Callable<List<String>>> tasks = new ArrayList<Callable<List<String>>>();
            int end = 0;
            for (int i = 0; i < threadNum; i++) {
                if (i * count + count > list.size()) {
                    end = list.size();
                } else {
                    end = i * count + count;
                }
                // 确定每条线程的数据
                List<String> cutList = list.subList(i * count, end);
                final List<String> listStr = cutList;
                final List<String> newList = new ArrayList<String>();
                Callable<List<String>> task = new Callable<List<String>>() {
                    @Override
                    public List<String> call() throws Exception {
                        for (String str : listStr) {
                            // 封装处理数据
                            str = packageString(str);
                            newList.add(str);
                        }
                        return newList;
                    }
                };
                // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
                tasks.add(task);
            }
            // 获取数据并重新set
            List<Future<List<String>>> results = exec.invokeAll(tasks);
            list.clear();
            for (Future<List<String>> future : results) {
                list.addAll(future.get());
            } 
        } finally {
            // 关闭线程池
            exec.shutdown();
            // if all tasks have completed following shut down
            while(!exec.isTerminated());
        }
        //System.out.println(list.toString());
        System.err.println("
执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        return list;
    } 

 2.数据未拆分多线程Callable处理数据

1.定义一个等于CPU核心数的线程池池

2.定义 Callable 的任务集合

3.遍历List处理数据

4.把 Callable 添加到 Callable 的任务集合中

5.把通过 Future 获取的数据并重新 add 进 List

6.返回数据

代码如下:

  public List<String> CallablePackageStrings(List<String> list) throws InterruptedException, ExecutionException{
        // 开始时间
        long start = System.currentTimeMillis();
        // 创建一个大小等于CPU核心数的线程池
        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        // 定义一个任务集合
        try {
            List<Callable<String>> tasks = new ArrayList<Callable<String>>();
            for (final String str : list) {
                Callable<String> task = new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        // 封装处理数据
                        return packageString(str);
                    }
                };
                // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
                tasks.add(task);
            }
            List<Future<String>> results = exec.invokeAll(tasks);
            list.clear();
            for (Future<String> future : results) {
                list.add(future.get());
            } 
        } finally {
            // 关闭线程池
            exec.shutdown();
            // if all tasks have completed following shut down
            while(!exec.isTerminated());
        }
        //System.out.println(list.toString());
        System.err.println("
执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        return list;
    } 

 3.数据未拆分多线程Callable处理数据精简代码

1.定义一个等于CPU核心数的线程池

2.定义Future的任务集合

3.遍历List处理数据

4.把 Future 添加到 Future 的任务集合中

5.把通过 Future 获取的数据并重新 add 进 List

6.返回数据

代码如下:

 public List<String> packageStrings(List<String> list) throws InterruptedException, ExecutionException{
        // 开始时间
        long start = System.currentTimeMillis();
        // 创建一个大小等于CPU核心数的线程池
        ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        // 定义FutureTask任务集合
        try {
            List<Future<String>> futures = new ArrayList<Future<String>>();
            for (final String str : list) {
                Future<String> future = exec.submit(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        // 封装处理数据
                        return packageString(str);
                    }
                });
                futures.add(future);
            }
            list.clear();
            for (Future<String> future : futures) {
                list.add(future.get());
            } 
        } finally {
            // 关闭线程池
            exec.shutdown();
            // if all tasks have completed following shut down
            while(!exec.isTerminated());
        }
        //System.out.println(list.toString());
        System.err.println("
执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        return list;
   }

以上三种推荐第三种:

第二种是在第一种基础上进行优化

第三种是在第二种基础上进行冗余代码去除

原文地址:https://www.cnblogs.com/mjtabu/p/12793293.html