处理多线程问题的工具类

线程安全的并发集合

java.util.concurrent包

ConcurrentHashMap ConcurrentLinkedDeque

CopyOnWriteArrayList 读取不会加锁,写入操作也不会阻塞读取,只是写入之间要实现同步,它的原理是当执行写入操作时,进行一次自我复制,把修改的数据写入副本,再用把副本和原来的数据替换。

它内部的array数组不会被修改,但是在写操作中被替换呈修改过后的副本。

注意一点,如果想要获取这些并发集合的大小,只能通过遍历的方法,或者用mappingCount方法确认。

AbstractQueuedSynchronizer 一个非常重要的基类

三个主要方向的方法:

tryAcquire  

tryAcquireShared

getQueuedThreads

内部用的是链式列表,双端队列

ThreadLocal 的内存泄露问题

引用链问题,用过元素后,要使用remove移除掉

并发集合的批处理:搜索,归约,迭代。

LockSupport类

定义了一组公共的静态方法,这些方法提供了最基本的线程阻塞和唤醒功能。

阻塞的方法以park为开头,唤醒的方法以unpark为开头。

 

它采用一种类似信号量机制,与Thread.suspend相比,弥补了由于resume在之前发生,导致线程无法继续执行;和wait相比,不需要获得某个对象的锁,也不会抛出中断异常。

 

原子类 

java.util.concurrent.atomic包

原子更新基本类型,原子更新数组,原子更新引用,原子更新属性

AtomicBoolean AtomicInteger AtomicLong

 

并发的工具类

等待多线程完成的CountDownLatch

CountDownLatch的构造器需要传入一个N的整数,大于0

CountDownLatch的对象调用countDown方法会使得N减去1 ;调用对象的 await 方法会阻塞当前线程,直到N的个数为0

 

同步屏障CyclicBarrier

让一组线程到达一个屏障时被阻塞,屏障也可称为同步点,直到最后一个线程到达同步点时,屏障才会开门。

 

默认的构造函数要传入一个整数N,代表有多少个线程需要到达屏障

调用CyclicBarrier对象的await方法,告知CyclicBarrier对象已经到达同步点。

 

高级的构造函数,需要传入一个N和一个Runnable对象,用于线程到达屏障时,优先执行这个

Runnable对象对象。

 

可以和其他的并发工具搭配,用于处理数据的计算。

 

控制并发线程数的Semaphore

Semaphore信号量 用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

流量控制

 

构造器传入的数量N,代表进入特定资源的线程个数

特定的资源前后要用对象的acquire release方法获取信号量和释放信号量

 

线程间交互数据的Exchanger

它提供一个同步点,在这个同步点,两个线程可以交互彼此的数据

通过exchange方法交换数据

如果一个线程先执行了该方法,那么它会一直等待第二个线程也只想该方法。

 

可以用于遗传算法 银行流水的校对工作

 

构造函数传入的类型是 交互的数据类型

exchange方法要传入交互的数据,同时返回另一个线程提供的数据

 

阻塞队列

对于多线程问题,如果是生产者和消费者问题,可以使用阻塞队列替代同步方法,使得数据同步。

生产者向队尾添加元素,消费者从队头取出元素,当队列满时添加元素,或队列空时取出元素,阻塞队列导致线程阻塞。

add,remove,element在队列满或空时操作会抛出异常,所以不要用这些方法。

而是阻塞队列当管理工具使用采用put take方法,多线程采用 offer poll peek方法。

操作抛出异常返回特殊值一直阻塞超时退出
插入方法 add offer put offer
移除方法 remove poll take poll
检查方法 element peel    

 

  1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

  2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

  3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。

  5. SynchronousQueue:一个不存储元素的阻塞队列。

  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

  7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

 

fork join框架

实现并行计算,可以将一个大规模计算问题分解成为两个小规模的问题,如果需要合并最终的结果,那么就通过join方法合并最终的结果。

想要实现这个框架,需要继承其中的一个类RecursiveTask<T>类或者 RecursiveActicn类,RecursiveTask类 和RecursiveActicn类相比compute方法,有返回值,如果要解决的问题不需要返回值,那么就继承RecursiveActicn类。

那么,使用invokeAll 方法,将分解的两个子问题交给线程池处理,使用join方法而不是get方法获取返回值,get可能会抛出异常,但是compute方法不允许抛出异常。

ForkJoinPool 是 专门处理 fork 框架的线程池。

class SumTask extends RecursiveTask<Integer> {

    private  int threshold ;
    private static final int segmentation = 10;

    private int[] src;

    private int fromIndex;
    private int toIndex;

    public SumTask(int formIndex,int toIndex,int[] src){
        this.fromIndex = formIndex;
        this.toIndex = toIndex;
        this.src = src;
        this.threshold = src.length/segmentation;
    }

    @Override
    protected Integer compute() {
        //如果该条件满足,那么直接把执行任务,得到结果
        if((toIndex - fromIndex)<threshold ){
            int count = 0;
            System.out.println(" from index = "+fromIndex
                    +" toIndex="+toIndex);
            for(int i = fromIndex;i<=toIndex;i++){
                count+=src[i];
            }
            return count; //join得到的结果
        }else{
            //条件不满足,拆分任务得到两个子任务
            int mid = (fromIndex+toIndex)/2;
            SumTask left =  new SumTask(fromIndex,mid,src);
            SumTask right = new SumTask(mid+1,toIndex,src);
            //将子任务插入线程池,执行子任务
            invokeAll(left,right);
            //返回子任务的结果
            return left.join()+right.join();
        }
    }

    public static void main(String[] args) {
        int[]  array = new int[40];
        Arrays.fill(array,10);
        //专门的线程池
        ForkJoinPool forkJoinPool= new ForkJoinPool();
        SumTask sumTask  = new SumTask(0,array.length-1,array);

        long start = System.currentTimeMillis();
        //提交任务
        forkJoinPool.invoke(sumTask);
        System.out.println("The count is "+sumTask.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");
    }
}

 

线程池

 三个接口,一个包装器,一个线程池接口

Runnable  Callable Future   FutureTask  ExecutorService

 

FutureTask包装器,可以将Callable转换成Runnable 和Future

 

Executor类有许多的静态工厂方法用来构建线程池

 

ScheduledExecutorService 接口,具有用于预定任务或者重复执行的任务的方法

 

ExecutorService 接口的submit方法,返回一个future接口;execute方法提交没有返回值的线程

 

invokeAny 提交一个 Callable 对象的集合给线程池 , 并返回某个已经完成了的任务的结果。 无法知道返回的究竟是哪个任务的结果 ,也许是最先完成的那个任务的结果。对于搜索问题 , 如果你愿意接受任何一种解决方案的话,你就可以使用这个方法 。

invokeAll 同上方法,只不过返回所有任务的结果。

 

注意一点,创建线程池 一般使用的是ThreadPoolExecutor 类

new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,millseconds,runnableTaskQueue,handler);
    
 //**
runnableTaskQueue任务队列,用于保存等待执行的任务的阻塞队列,可以旋转4种阻塞队列
handler参数指明了拒绝策略,及线程池超载了,已经塞不下新的任务了,该如何处理新的任务既是拒绝策略。JDK内部有4种策略,一般采用的是不做任何处理,允许丢失DiscardPolicy策略。
*/

 自定义线程池

ThreadFactory接口,只有一个方法,那就是创建线程  Thread newThread(Runnable r) 可以在这个方法内部对线程进行设置改造,这个接口可以作为参数传入 ThreadPoolExecutor 实例对象,但需要实现ThreadFactory接口。

ThreadPoolExecutor 也是一个可扩展的线程池,它提供了三个方法,对线程池运行之前,之后,消耗进行控制。

beforeExecute afterExecute terminated

ThreadPoolExecutor .Worker 是ThreadPoolExecutor 的内部类,它实现了 Runnable接口,ThreadPoolExecutor 线程池的工作线程就是Worker的实例对象 

Worker.runTask()方法会以多线程异步的形式调用

合理地选择线程池的线程数量

线程池需要考虑CPU,内存的大小

线程数量= CPU数量 *  目标CPU使用率 *(1+ 等待时间/计算时间)

CPU数量= Runtime.getRuntime().availableProcessors()

线程池中寻找堆栈信息

如何向线程池讨要堆栈信息

一个简单的方法就是 使用 execute 放弃 submit

另外的办法就是 继承ThreadPoolExecutor ,让它在任务调度前,保持堆栈信息。wrap方法

 多线程框架

Future框架

无锁缓存的Disruptor框架

新的并发模型 Actor ,Akka框架

 

原文地址:https://www.cnblogs.com/lin7155/p/13709277.html