多线程实际应用踩坑

  多线程实操

  近日,在做大数据量处理时,因单线程耗时过程,需添加多线程处理,实操中碰到诸多问题,现提交总结如下:

  注 : 什么场景下适合采用多线程 ? 

     代码执行耗时较长的时候 , 如数据库交互/远程调接口/流操作/循环次数高, 这时候采用多线程并发处理将有效降低代码工作时长.

  1.注意成员变量与局部变量

  单线程时,遇到公共变量/集合等等,多构建一个成员类型的变量用以多方法间共同操作,但在多线程时却是个大问题.

  多线程运行期间,多个线程在对同一个对象的成员变量进行操作时,其对该成员变量是彼此影响的,即某一线程对成员变量的改变会影响到另一线程.

  这时,需根据业务逻辑,将成员变量的作用压缩到局部变量中来操作,我的做法是在类中提取一个公共方法.

  为什么?

  因为每个线程都会有一个该局部变量的拷贝(即便是同一个对象中的方法的局部变量,也会对每一个线程有一个拷贝),一个线程对该局部变量的改变不会影响到其他线程.

  实操中,存储关键帧的集合置为成员变量,运行期间,在第二次校验中,返回结果十分随机,不合预期;修改业务代码,在公共方法中操作,问题解决.

  2.线程池的使用/最大线程数

  现在多数个人电脑为4核8线程,服务器为8核16线程.为更优的用好多线程,避免无关的线程阻塞和资源浪费,需在使用线程池时预设最大线程数,并为实际进行多线程操作的数据集做好分工.实操如下:

                // split 起始数据集
                int dataSize = split.size(); // 总数据条数
                int threadNum = VideoSearchConstant.ServerCPUMaxThreadSize ; // 线程数
                // 计算获取最大数据条数
                int threadSize = dataSize % threadNum == 0 ? dataSize / threadNum : dataSize / threadNum + 1;
                threadNum = dataSize % threadSize == 0 ? dataSize / threadSize : dataSize / threadSize + 1 ;
                log.info("first query from hbase used threads num : " + threadNum);
                // 创建线程池
                ExecutorService exec = Executors.newFixedThreadPool(threadNum);
                // 定义一个任务集合
                List<Callable<Integer>> tasks = new ArrayList<>();
                Callable<Integer> task;
                List<List<String>> cutList ;
                for (int i = 0 ; i < threadNum ; i++){
                    if (i == threadNum - 1) {
                        cutList = split.subList(threadSize * i , dataSize);
                    } else {
                        cutList = split.subList(threadSize * i , threadSize * (i + 1));
                    }

                    List<List<String>> listThread = cutList ;

                    task = new Callable<Integer>() {
                        @Override
                        public Integer call() throws Exception {
                            //log.info(Thread.currentThread().getName() + "线程 : " + listThread.size());
                            for (List<String> srcList :
                                    listThread) {
                                // ... 业务代码
                            }
                            return 1;
                        }
                    };
                    tasks.add(task);
                }
                List<Future<Integer>> futures = exec.invokeAll(tasks);
                if (futures.size() == threadNum) {
                    exec.shutdown();
                }
                

  其中,Callable任务task的声明采用的lambda表达式,后续代码中Java 8 stream流接口的使用也很多,相关学习,参见github代码: 

  git@github.com:CoolPicker/test-webflux.git

  test-webflux/src/main/java/com/example/demo/jdk8/

  3.多线程的一般使用示例

  目前多数博客总结的多线程的两种实现方式,继承Thread类或者实现Runnable接口,给出的示例也多是新建一个类来实现多线程.实操如此就纯粹耽误事,实用demo如下:

  这里通过实现一个Runnable ---- 定义一个 void no-args 功能接口,如下:

    Runnable task = () -> {
        String threadName = Thread.currentThread().getName();
        System.out.println("Hello " + threadName);
    };

    task.run();

    Thread thread = new Thread(task);
    thread.start();
    
    System.out.println("Done!");

  测试用例如下:

  public static void main(String[] args) {
        long aa = System.currentTimeMillis();
        Runnable task = () -> {
            String name = Thread.currentThread().getName();
            try {
                for(int i = 0 ; i < 10 ; i++) {
                    TimeUnit.SECONDS.sleep(1); // 睡一秒
                    System.out.println("heihei " + name);
                }
            } catch (Exception e) {
                System.out.println("error second thread");
            }
        };
        Thread thread = new Thread(task);
        thread.start();
        try {
            String name = Thread.currentThread().getName();
            for (int i = 0 ; i < 5 ; i++) {
                TimeUnit.SECONDS.sleep(2); // 睡两秒
                System.out.println("main " + name);
            }
        } catch (Exception e) {
            System.out.println("main thread error");
        }
        long bb = System.currentTimeMillis();
        System.out.println("cost : " + (bb - aa) / 1000);
    }

  此处可用于单个线程的并发,多个的话仍推荐采用线程池ExecutorService.

  4.ConcurrentHashMap的size()避坑

  ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

  实际应用中 , 避免频繁获取size , 否则将造成线程阻塞 , 尤其是在用作缓存的时候 . 

原文地址:https://www.cnblogs.com/nyatom/p/9493181.html