JUC中的阻塞队列

阻塞队列介绍

    1,ArrayBlockingQueue:数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序。
    2,LinkedBlockingQueue:链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
 
    3,PriorityBlockingQueue:支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素 排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。
 
    4,DelayQueue:优先级队列实现的无界阻塞队列。
 
    5,SynchronousQueue:不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
 
    6,LinkedTransferQueue:链表实现的无界阻塞队列。
 
    7,LinkedBlockingDeque:链表实现的双向阻塞队列。
 

    插入数据的方法

boolean add(E e);    //如果队列满了继续插入会 抛出异常
boolean offer(E e);    //    返回true或false表示插入成功或者失败
void put(E e);    //阻塞插入元素

    获取数据的方法

boolean remove(Object o);    //返回true或false表示获取成功或者失败,队列为空返回false
E poll();        //当队列中存在元素时,返回该元素,如果队列为空返回null
E take();    //阻塞获取元素

ArrayBlockingQueue原理分析

    1,构造方法

    int takeIndex;
    int putIndex;
    int count;
    // capacity 数组的长度
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    2,add方法

    public boolean add(E e) {
        //调动了offer 返回true和false
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");    //如果插入失败 抛出异常
    }
    
       public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)    //如果队列满了 返回false
                return false;
            else {
                enqueue(e);        //将元素添加到队列中
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;        //通过 putIndex 索引直接将元素添加到数组 items
        if (++putIndex == items.length)
            putIndex = 0;        // 如果下次存放的索引等于队列的长度,那么指向0,这就是循环队列
        count++;
        notEmpty.signal();    //唤醒处于等待的线程 例如take阻塞的线程
    }
    

    3,put方法,和add方法一样,不过当队列满了以后,会阻塞。

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //获得锁,和lock的区别是这个方法优先允许在等待时由其他线程调用等待线程的 interrupt 方法来中断等待直接返回。而 lock方法是尝试获得锁成功后才响应中断
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                //队列满了之后,当前线程被notFull条件对象挂起
                notFull.await();            
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    4,take方法阻塞获取队列中元素的方法。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();    //队列中没有元素,阻塞
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    //删除队列头部的元素 返回给客户端
    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];        //默认获取位置为0的元素
        items[takeIndex] = null;        //将该位置元素置为空
        if (++takeIndex == items.length)
            takeIndex = 0;        //如果takeIndex达到最大值,则重新从0获取
        count--;    //元素个数递减
        if (itrs != null)
            itrs.elementDequeued();    //同时更新迭代器中的元素数据
        notFull.signal();    //触犯队列满了以后的阻塞
        return x;
    }

    5,remove方法,删除指定的一个元素

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;    //获取下一个要添加元素的索引
                int i = takeIndex;    //获取当前要被移出的元素的索引
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

原子操作类

    JDK1.5之后,juc提供了Atomic包,提供了对于常用数据结构的原子操作。它提供了一个简单、高效以及线程安全的对变量更新的操作方式。按照变量类型的划分可以分为以下几类。
    1,原子更新基本类型:AtomicBoolean、AtomicInteger、AtomicLong
 
    2,原子更新数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
 
    3,原子更新引用:AtomicReference、 AtomicReferenceFieldUpdater、AtomicMarkableReference(更新带有标记位的引用类型)
    4,原子更新字段:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicStampedReference
 

    AtomicInteger分析

    unsafe:unsafe类相当于一个后门,使得java可以像c语言的指针一样直接操作内存空间,实际上这个类在很多方面都有使用,例如Netty、Kafka等等。
    这个类提供了很多功能:包括多线程同步(MonitorEnter)、CAS操作(compareAndSwap)、线程的挂起和恢复(park和unpark)、内存屏障(loadFence和storeFence)、内存管理(内存分配,获取内存地址和释放内存等)。

    1,getAndIncrement()

    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

    2,getAndAdd(int )

    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

    3,get()

   //value用volatile修饰
    public final int get() {
        return value;
    }



原文地址:https://www.cnblogs.com/gaojf/p/12837348.html