DelayQueue 源码分析

DelayQueue

DelayQueue 是基于 PriorityQueue 实现的线程安全的无界优先级阻塞队列,
队列的头部元素必须在超时后才能移除,元素必须实现 Delayed 接口。

创建实例

    // 控制访问的互斥锁
    private final transient ReentrantLock lock = new ReentrantLock();
    // 持有元素的优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<>();

    /**
     *  在延迟队列头部超时阻塞等待的线程,当有 leader 时,其他线程将无限期等待下去。
     *  leader 线程读取到元素之后,必须唤醒其他等待的线程。
     */
    private Thread leader;

    /**
     *  当队列为空或有 leader 在阻塞等待时,当前线程将在该条件上阻塞等待。
     */
    private final Condition available = lock.newCondition();

    /**
     *  创建一个空的延迟队列
     */
    public DelayQueue() {}

读取元素

    /**
     *  移除并获取延时队列的头部元素,如果没有元素超时,则阻塞等待
     */
    @Override
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 读取队列头部元素
                E first = q.peek();
                // 1)如果队列为空
                if (first == null) {
                    // 当前线程在 available 条件上阻塞等待
                    available.await();
                } else {
                    // 获取元素的超时时间
                    final long delay = first.getDelay(NANOSECONDS);
                    // 如果元素已经超时
                    if (delay <= 0L) {
                        // 则移除并返回头部元素
                        return q.poll();
                    }
                    first = null; // don't retain ref while waiting
                    // 1)如果已经有线程在阻塞等待
                    if (leader != null) {
                        // 当前线程在 available 条件上阻塞等待
                        available.await();
                    } else {
                        // 2)当前线程是第一个阻塞等待的线程
                        final Thread thisThread = Thread.currentThread();
                        // 写入 leader
                        leader = thisThread;
                        try {
                            // 阻塞等待指定的超时时间
                            available.awaitNanos(delay);
                        } finally {
                            // 线程被激活后尝试移除 leader
                            if (leader == thisThread) {
                                leader = null;
                            }
                        }
                    }
                }
            }
        } finally {
            // 如果没有 leader 并且队列中有元素存在 
            if (leader == null && q.peek() != null) {
                // 唤醒其他阻塞等待的线程来读取元素
                available.signal();
            }
            lock.unlock();
        }
    }

添加元素

    /**
     *  将目标元素 e 插入到延时队列中,由于是无界的,该操作不会被阻塞
     */
    @Override
    public void put(E e) {
        offer(e);
    }

    @Override
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 插入元素
            q.offer(e);
            // 如果当前元素是优先级最高的元素
            if (q.peek() == e) {
                // 清除 leader
                leader = null;
                // 唤醒在可用条件上等待的线程
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
原文地址:https://www.cnblogs.com/zhuxudong/p/10085246.html