Java队列学习笔记(2)---DelayQueue

DelayQueue

DelayQueue 的性质:

  • 它是一个线程安全的队列。
  • 包含 PriorityQueue 的性质。
  • 放入该队列的元素必须实现 Delayed 接口
  • 从该队列取出对象时,需要询问对象的执行延迟。即队头不为 null 条件还不充分,还需要剩余延迟 delay <= 0,对象才能正常出队。这点比较特殊。

Delayed 接口

Delayed 用于标记在给定延迟后应执行操作的对象。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);

getDelay 方法的实现中并不需要阻塞,只需要返回给定时间单位中与此对象关联的剩余延迟。

public interface ScheduledFuture<V> extends Delayed, Future<V> {}

比如实现 ScheduledFuture 接口的对象就可以做 DelayQueue 的元素。

他包含的关键成员变量

// 可重入锁, 不参与序列化和反序列化
private final transient  lock = new ReentrantLock();
// 依赖优先级队列,在此基础上封装线程安全的逻辑
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 指定在队列头等待元素的线程。
private Thread leader = null;
// 条件锁:当一个较新的元素在队列头变得可用或一个新线程可能需要成为 leader 时发出的状态信号。
private final Condition available = lock.newCondition();

poll 立即出队

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取队首元素,但不从队列中移除
        E first = q.peek();
        // 条件一:first == null
        // 条件不成立:队首元素不为 null,队列中至少包含一个元素。进入 else 逻辑
        // 条件成立:队列中没有元素
        // 条件二:first.getDelay(NANOSECONDS) > 0
        // 条件成立:这是一个延迟执行的对象
        // 条件不成立:这不是一个延迟执行的对象
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            // 情况一:队列为空,且该队列中的元素不是延迟执行的对象
            // 情况二:队列不为空
            // 优先级队列的队首元素出队
            return q.poll();
    } finally {
        lock.unlock();
    }
}

take 阻塞式出队

public E take() throws InterruptedException {


    final ReentrantLock lock = this.lock;
    // 该锁可以响应打断
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 获取队头元素,但是不把它从队列中移走
            E first = q.peek();
            // 条件满足,队列为空
            if (first == null)
                // 陷入条件阻塞
                available.await();
            else {
                // 进入该分支,表示队头元素不为null,队列中至少包含一个元素
                // 获取的延迟执行时间
                long delay = first.getDelay(NANOSECONDS);
                // 如果延迟执行时间 <= 0, 不用延迟,立即出队
                if (delay <= 0)
                    return q.poll();
                // 释放队首元素的引用
                first = null; // don't retain ref while waiting
                // leader 表示正在等待队首元素的线程
                // 条件成立:已经有线程在等待队首元素了,当前线程条件阻塞
                // 条件不成立:当前线程是目前第一个等待队首元素的线程
                if (leader != null)
                    available.await();
                else {
                    // 多个线程执行 take 方法时,仅有一个线程来到此 else 分支
                    // 因此只有一个线程进行超时等待,其他线程则会因为 leader != null 而永久等待
                    // 获取当前线程
                    Thread thisThread = Thread.currentThread();
                    // 将当前线程引用赋值给首个等待队首元素的线程 leader
                    leader = thisThread;
                    try {
                        // 当前线程阻塞最多 delay 的延迟时间,单位纳秒
                        available.awaitNanos(delay);
                    } finally {
                        // 新的队首元素加入到队列中,或者已经超过等待的最大时长 delay
                        // 条件成立:当前唤醒的线程就是 leader 线程,leader 线程置为 null
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 返回优先级队列的队首元素时,或者发生打断时,跳出自旋,执行此处方法
        // 条件一:leader == null
        // 条件成立:表示目前所有线程都在进行“无限”等待
        // 条件二:q.peek() != null
        // 条件成立:队列中队头元素不为 null,队列中至少包含一个元素
        if (leader == null && q.peek() != null)
            // 通知等待条件的线程来竞选 leader
            available.signal();
        lock.unlock();
    }
}

offer 元素加入队尾

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 队列发生变化的过程,需要加锁
    lock.lock();
    try {
        // 调用优先级队列的 offer 方法,插入 e 元素到队尾
        q.offer(e);
        // 条件成立:e 成为了新的队首元素
        if (q.peek() == e) {
            // 因为新加入的元素,导致原先的 leader 无效了,重置 leader 为 null
            // 通知等待的线程重新竞选 leader
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

poll(timeout, unit) 超时等待出队

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 队首元素出队
            E first = q.peek();
            // 条件成立:队首元素为 null
            // 条件不成立:队首元素不为 null
            if (first == null) {
                // 条件成立:nanos <= 0, 即传入的参数 timeout <= 0
                // 条件不成立:走 else 分支,超时时长 > 0
                if (nanos <= 0)
                    return null;
                else
                   // 最多等待 nanos 纳秒,nanos 返回值为剩余延迟等待秒数
                    nanos = available.awaitNanos(nanos);
            } else {
                // 队首元素不为 null,但是也不能立即返回,还需要考虑对象的延迟执行特性
                long delay = first.getDelay(NANOSECONDS);
                // 条件成立:延迟等待时长<=0
                if (delay <= 0)
                    // 此时,优先级队列的队首元素出队
                    return q.poll();
                // 条件成立: nanos <= 0
                // 情况一:调用方法时,传入的参数 timeout <= 0
                // 情况二:因为第一轮自旋时,first == null,然后已经 awaitNanos 超时了,现在虽然代码已经运行到这一行
                if (nanos <= 0)
                    // 已经没有额外的时长来等待对象的延迟执行,所以这里返回 null
                    return null;
                // 释放 first 引用
                first = null; // don't retain ref while waiting
                // 条件一: nanos < delay 
                // 条件不成立:允许再等待至少 delay 的时长,也不会超过传入的参数延迟
                // 条件成立:允许等待的剩余延迟 nanos 小于对象执行的剩余延迟 delay
                // 条件二:隐含条件 nanos >= delay
                // 条件成立:等待队首元素的 leader 线程存在,当前线程直接进入超时等待
                // 条件不成立:进入 else 分支
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    // 允许等待的剩余延迟 nanos 大于等于对象执行的剩余延迟 delay,且 leader 线程为 null
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        // delay - timeLeft 表示本次从睡眠到被唤醒所用的时间
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 返回优先级队列的队首元素(即调用 q.poll 时),或者发生线程打断时,跳出自旋,执行此处方法
        // 条件一:leader == null
        // 条件成立:表示目前所有线程都在进行“无限”等待
        // 条件二:q.peek() != null
        // 条件成立:队列中队头元素不为 null,队列中至少包含一个元素
        if (leader == null && q.peek() != null)
            // 通知等待条件的线程来竞选 leader
            available.signal();
        lock.unlock();
    }
}
原文地址:https://www.cnblogs.com/kendoziyu/p/java_code_analysis_of_DelayQueue.html