JAVA并发工具类

一、分而治之

fork/join     二叉树  二分查找  快速排序  归并排序  mapreduce  动态规划

1、fork/join(工作密取)

  RecursiveTask要有返回值

  RecursiveAction没有返回值

  invoke(同步)

  submit(有返回结果异步)

  execute(没有返回结果异步)

2、countDownLatch(闭锁)只能使用一次

作用:直指一个线程等待其他的线程执行完后再执行,(就是一个加强版的join())

用一个线程可以有多个扣除点,扣除点可以在程序的中间,await可以有多个。

3、CyclicBarrier可以使用一次

作用:一组工作线程之间需要进行协作

4、Semaphore(控制同时访问某个特定资源的线程数量,例如用于流量控制)

Semaphore的大坑就是:release方法可以无限增加;所以一般会定义两个Semaphore变量,结合来使用。

Semaphore只是控制流量,获取许可证;需要和锁配合使用才能完成总的并发控制

5、Exchanger用于两个线程间的数据交换

6、Future

  isDone()不管正常或者异常结束又或者自己取消,都会返回true

  isCancelled如果任务在完成前被取消,返回true;其他情况返回false

  cancel()(1)如果任务已经完成,或者已经取消或者由于某些原因不能取消,则返回false;

       (2)如果任务还没有执行,则会返回true,并且异步任务不会执行

     (3)如果任务已经开始,但是还没有执行完成,则返回true(mayInterrupIfRunning为true时,会中断线程)

FutureTask  treiber  AQS实现

二、原子类(CAS原理)乐观锁的思想

悲观锁容易发生死锁

原子操作实现方式:1、synchronized;2、原子类

1、cas的原理:

cpu硬件指令集的支持

包括三个运算参数:内存地址V,期望值A,新值B

不能更新成功的话,会一直自旋

2、cas的问题

ABA问题

开销问题

只能保证一个共享变量的原子操作

3、AtomicBoolean,AtomicInteger,AtomicLong

4、AtomicReference(多个变量同时进行更新,把多个变量包装成引用类),AtomicStampedReference()(有具体的版本号),AtomicMarkableReference(boolean标志版本)(只关心有没有变化)

5、AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray[原子数组类实质上又另外创建了一个和原数组完全独立,但一模一样的数组]

6、源自更新字段类AtomicIntegerFieldUpdate,AtomicLongFieldUpdate,AtomicReferenceFieldUpdate

原子操作底层用的是CAS操作

三、显示锁

1、ReentrantLock(优先使用synchronized)

排它锁

线程退出时(执行完时),它所拥有的资源都会被释放

Synchronized进入调锁的过程中后:Synchronized不提供中断和超时机制

Synchronized消耗更少一点,Synchronized是语言特性,而显示锁是一种类

Synchronized方法块内抛异常,代码脱离方法快,锁会自动释放

Lock获取锁可以被中断,超时获取锁,尝试获取锁

非公平锁充分的利用了线程被唤醒的这段时间

公平锁(排队)与非公平锁(可以插队,效率更高)

synchronized尽量使用notifyAll;ReentrantLock尽量使用signal

一个Lock对应一个一个Condition

一个Lock对应多个Condition

lockInterruptibly

tryLock

tryLock(timeout)

2、读写锁ReadWriteLock

读是共享的,写是排它锁,最适合于读多写少的场景

四、LockSupport(基础工具)

五、AQS(抽象队列同步器)

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

这个方法是ReentrantLock.NonfairSync.lock()方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这个方法就是尝试获取独占所同步状态,这是AQS的方法

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

 这个方法是由ReentrantLock.NonfairSync.tryAcquire实现

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

这个方法是由ReentrantLock.Sync.nonfairTryAcquire实现(尝试获取独占锁,或者重入独占锁)

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这个方法是AQS的方法,这个方法是(1)先设置尾节点compareAndSetTail(2)如果前面设置没成功,就进入死循环enq方法,这个方法里会判断为节点是否为空,如果为空,就先设置head指向New Node()空节点,然后再循环;如果不为空,就设置compareAndSetTail,如果设置成功就可以退出了,否则继续循环。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这个就是AQS的enq方法

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 这个方法就是加入到AQS队列以后发生的等待操作

Springtemplate就是模板设计模式

state有三个方法(getState,setState,compareAndSetState)

state为0时,可以直接获取锁,不需要加入AQS队列

state不为0时,需要尝试获取锁

模板方法设计模式

重入锁:

读写锁:

分布式锁:抢到本地锁的线程再去抢分布式锁;redis性能好;zookeeper稳定性好。

读写锁的实现

读锁的实现

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

AQS的模板方法:

       protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

尝试获取读锁:

  1、如果当前线程是写线程,直接返回-1(加入等待队列)

  2、如果当前头结点的后继节点是读线程直接返回1获取到所

  3、如果当前头结点的后继节点是写线程,直接执行fullTryAcquireShared方法(这个策略完全是为了照顾写线程,或者说是防止写线程过分的饥渴)

        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }

as

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

as

        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

循环尝试获取读锁:

  1、如果当前线程是写线程,直接返回-1(加入等待队列)

  2、如果当前头结点的后继节点是写线程且不是锁的重入状态,那么就直接退出-1(加入等待队列)

  3、如果写线程超量,直接抛出异常

  4、如果修改状态成功,直接返回1,获取读锁成功

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 写锁的实现

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

as

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

1、如果当前锁是共享锁,那么直接返回false

2、如果可重入写线程数量太多超过最大值,抛出异常

3、可重入线程正常直接返回true

4、如果当前没有锁,且自己抢到了锁,直接返回true

5、如果当前没有锁,且没有抢到锁,直接返回false

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

as

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

as

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

五、并发容器

位运算的关键【a%2^n==a&(2^n-1)】

hashmap会造成死循环:扩容时头插法惹的祸

1、ConcurrentHashMap

get   put   putIfAbsent

putIfAbsent返回原来那个key所对应的值

 取模操作相当于位与操作

位运算用途:权限控制和商品属性

jdk1.7实现concurrenthashmap:segment数组(16缺省并发度这个数必须是2^n,这个数组也不允许扩容,使用hash值的高位部分进行hash)(ReentrantLock)创建segment元素时使用CAS保证线程安全  每一个segment下面包含一个hashentry数组table[]这个数组大小也必须是2^n,通过整个hash值进行hash,负载因子是0.75;①定位segment②定位hashentry③循环链表;get方法不加锁,只用了volatile{final hash;final key;volatile value;volatile next}      put方法 拿锁tryLock() 和lock()结合使用,提前new Node(key,value)   扩容也最好是2的倍数,迁移的时候会变更迁移方便,扩容会扩与扩原来的2倍   

这两个方法尽量少使用:(size方法首先算两遍然后可能会全部加锁   containsValue也需要全部加锁)

存在的问题:弱一致性,获取的过程中整个结构可能发生调整,因为get方法是没有加锁的

扩容:stride步长+forwarding

再设置sizeCtl

size()=basecount+countercells

强一致性:Collections.synchronizedMap(m),HashTable

jdk1.8实现concurrenthashmap:CAS+synchronized

数组+链表+红黑树(6~8)

元素是Node(final,final,volatile,volatile)不是Entry(TreeNode继承于node(树叶),TreeBin(树根),Node);treebin之中使用了读写锁;

forwardingnode扩容的时候需要用到占位

sizeCtl(-1正在初始化)(0是默认值)(-n表示有多少线程正在扩容)(正数相当于负载因子)

tabAt,setTabAt,casTabAt

构造函数什么也没做

get方法:数组+红黑树+链表

put方法:初始化table,只有一个线程能够初始化成功(sizectl开控制这个并发);CAS操作把sizectl置为-1的线程初始化table;sizectl设置为阈值;CAS操作向数组里放入值;正在扩容时,帮助扩容;synchronized(锁住树根)网链表或者红黑树里面插入值【尾插法】,树与链表的转换;扩容

负载因子仍然是0.75

sizeCtl

Node

get put

jkd1.8实现hashmap:treenode(继承于linkedhashmap.entry)

2、跳表

增加链表的快速查找性(redis,lucence)

3、ConcurrentLinkedQueue

无界非租塞队列,线程安全

4、CopyOnWriteArrayList

CopyOnWriteArraySet

最适应读多写少的并发场景(白名单,黑名单)

内存占用严重,只能保证最终一致性

批量提交

5、阻塞队列(尽量使用有界队列)(等待通知模式)

有界就是put会阻塞的队列

无界就是put不会阻塞的队列

生产者消费者模式

ArrayBlockingQueue(有界阻塞队列,先进先出原则,必须设定初始大小,只用一个锁,直接插入元素)【生产者和消费者是同一把锁】

LinkedBlockingQueue(有界阻塞队列,先进先出原则,可以不设定初始大小默认就是Integer.MAX_Value,用了两个锁,插入元素时需要转换)【生产者和消费者是两把锁】

PriorityBlockingQueue(无界阻塞队列)(默认,按照自然顺序,要么实现compareTo()方法,指定构造参数Comparator)

DelayQueue(无界阻塞队列)(实现自己的缓冲系统,订单到期,限时支付)(延时获取元素)消息中间件

SynchronousQueue(不存储元素)(每一个put操作都要等待一个take操作)(Exchanger)

LinkedTransferQueue(无界阻塞队列)相比LinkedBlockingQueue,多了两个方法transfer()必须要消费者消费了以后方法才会返回(先直接给消费者,消费者没有则放入队列阻塞);tryTransfer()无论消费者是否接收,方法都立即返回(先给消费者,消费者没有就直接返回)

LinkedBlockingDeque(双向阻塞队列(fork/join工作密取))可以从头和尾插入和删除元素

六、线程池

如果开启prestartAllCoreThreads,那么提交任务就会把线程直接放到queue里面;当queue里面满了的时候,就直接执行线程数直到线程最大值;如果队列设置太大;那么最大线程数和拒绝策略就没什么意义了。;队列的大小最好大于核心线程数,但是不能过大。

1、ThreadPoolExecutor

  keepalivetime不对coresthread起作用,请对其他线程起作用

  ThreadFactory(作用是创建线程名字)

  RejectedExecutionHandler(拒绝策略)

    AbortPolicy(直接抛出异常,默认策略)

    CallerRunsPolicy让提交的线程执行该线程

prestartAllCoreThreads设置一开始就会起来corepool线程

allowCoreThreadTimeOut是否允许核心线程数也超时退出

    DiscardOldestPolicy(丢弃队列里面最老的任务)

    DiscardPolicy直接抛弃

线程池的AOP模式

计算密集型:加密,大数分解,正则【线程数适当小一点,机器的cpu核心数+1,操作系统调用线程就会造成页缺失,+1为了防止也确实】

IO密集型:读取文件,数据库连接,网络通讯,rpc【线程数适当大一点:cpu的核心数×2】(NCPU*UCPU*(1+W/C))

混合型:尽量拆分成为IO密集型和计算密集型(如果使用的时间相差过大,就不需要拆分了)

FixedThreadPool(创建固定数量的线程池,适用于负载较重的服务器,使用了无界队列)

CachedThreadPool会根据需要创建新线程(执行很多短期异步任务的程序)使用了SynchronousQueue阻塞队列

SingleThreadPool创建单个线程,需要顺序保证执行任务,没有线程安全问题,使用了无界队列(保证任务串行执行)

WorkStealingPool工作密取(fork/join)

2、ScheduledThreadPoolExecutor

  (1)newSingleThreadScheduledExecutor只包含一个线程,只需要单个线程执行周期任务,保证顺序的执行各个任务

  (2)newScheduledThreadPool可以包含多个线程的,线程执行周期任务,适度控制后台线程数量的时候

  方法说明:

    schedule只执行一次,可以延时执行(抛出异常以后,任务直接中止执行;需要用try---catch)

    scheduleAtFixedRate任务超时怎么办?下一个任务马上开始执行(抛出异常以后,任务直接中止执行;需要用try---catch)

    scheduleWithFixedDelay(抛出异常以后,任务直接中止执行;需要用try---catch)

3、CompletionService

4、CompleteableFuture

七、线程安全性

栈封闭

无状态(无状态的类即没有任何成员变量的类)【servlet是线程不安全的】

让类不可变(final,String,包装类)只要不变,所有的成员变量都加上final关键字(Akka)

volatile保证类的可见性(最适合一个线程写,多个线程读的情景)(1)可见性(2)禁止重排序

加锁和CAS

安全的发布

ThreadLocal

Collections.synchronizedList()和使用synchronized包装普通类(继承和委托)

AtomicReference,CopyOnWriteArrayList

死锁是由于加锁造成的(争夺资源顺序,操作者>资源数)(数据库事务也可能产生死锁)jps -v    ;jstack 7412

解决办法:强制拿锁顺序必须一致,动态死锁原生hashcode解决(拿第三次锁);tryLock尝试加两个锁(但为了避免活锁,也要等待随机时间);

活锁

线程饥饿:读写锁的机制

性能:线程上下文切换,锁同步,页缺失

vmstat

内存屏障指令,对性能有影响

阻塞:将导致线程的挂起

解决办法:

同时锁两个锁

减少锁的范围

减少锁的力度(通过增加锁的数量)

锁分段

替换独占锁(读写锁的应用,CAS替换synchronized,并发容器)

避免多余的锁(就是把两把锁合成一把锁,锁的粗化)

    

原文地址:https://www.cnblogs.com/erdanyang/p/12333302.html