《转》精巧好用的DelayQueue

该文章转自:http://www.cnblogs.com/jobs/archive/2007/04/27/730255.html

我们谈一下实际的场景吧。我们在开发中,有如下场景

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。 

这场景,使用DelayQueue最适合了。

DelayQueue是java.util.concurrent中提供的一个很有意思的类。很巧妙,非常棒!但是java doc和Java SE 5.0的source中都没有提供Sample。我最初在阅读ScheduledThreadPoolExecutor源码时,发现DelayQueue的妙用。随后在实际工作中,应用在session超时管理,网络应答通讯协议的请求超时处理。

本文将会对DelayQueue做一个介绍,然后列举应用场景。并且提供一个Delayed接口的实现和Sample代码。

DelayQueue是一个BlockingQueue,其特化的参数是Delayed。(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

他们的基本定义如下

public interface Comparable<T> {
    public int compareTo(T o);
}
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

  

public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { 
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}

DelayQueue内部的实现使用了一个优先队列。当调用DelayQueue的offer方法时,把Delayed对象加入到优先队列q中。如下:

 1 public boolean offer(E e) {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         E first = q.peek();
 6         q.offer(e);
 7         if (first == null || e.compareTo(first) < 0)
 8             available.signalAll();
 9         return true;
10     } finally {
11         lock.unlock();
12     }
13 }

DelayQueue的take方法,把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。如下:

 1 public E take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();
 4     try {
 5         for (;;) {
 6             E first = q.peek();
 7             if (first == null) {
 8                 available.await();
 9             } else {
10                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);
11                 if (delay > 0) {
12                     long tl = available.awaitNanos(delay);
13                 } else {
14                     E x = q.poll();
15                     assert x != null;
16                     if (q.size() != 0)
17                         available.signalAll(); // wake up other takers
18                     return x;
19 
20                 }
21             }
22         }
23     } finally {
24         lock.unlock();
25     }
26 }

以下是Sample,是一个缓存的简单实现。共包括三个类Pair、DelayItem、Cache。如下:

 1 public class Pair<K, V> {
 2     public K first;
 3 
 4     public V second;
 5     
 6     public Pair() {}
 7     
 8     public Pair(K first, V second) {
 9         this.first = first;
10         this.second = second;
11     }
12 }

以下是Delayed的实现

 1 import java.util.concurrent.Delayed;
 2 import java.util.concurrent.TimeUnit;
 3 import java.util.concurrent.atomic.AtomicLong;
 4 
 5 public class DelayItem<T> implements Delayed {
 6     /** Base of nanosecond timings, to avoid wrapping */
 7     private static final long NANO_ORIGIN = System.nanoTime();
 8 
 9     /**
10      * Returns nanosecond time offset by origin
11      */
12     final static long now() {
13         return System.nanoTime() - NANO_ORIGIN;
14     }
15 
16     /**
17      * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
18      * entries.
19      */
20     private static final AtomicLong sequencer = new AtomicLong(0);
21 
22     /** Sequence number to break ties FIFO */
23     private final long sequenceNumber;
24 
25     /** The time the task is enabled to execute in nanoTime units */
26     private final long time;
27 
28     private final T item;
29 
30     public DelayItem(T submit, long timeout) {
31         this.time = now() + timeout;
32         this.item = submit;
33         this.sequenceNumber = sequencer.getAndIncrement();
34     }
35 
36     public T getItem() {
37         return this.item;
38     }
39 
40     public long getDelay(TimeUnit unit) {
41         long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
42         return d;
43     }
44 
45     public int compareTo(Delayed other) {
46         if (other == this) // compare zero ONLY if same object
47             return 0;
48         if (other instanceof DelayItem) {
49             DelayItem x = (DelayItem) other;
50             long diff = time - x.time;
51             if (diff < 0)
52                 return -1;
53             else if (diff > 0)
54                 return 1;
55             else if (sequenceNumber < x.sequenceNumber)
56                 return -1;
57             else
58                 return 1;
59         }
60         long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
61         return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
62     }
63 }

以下是Cache的实现,包括了put和get方法,还包括了可执行的main函数。

 1 import java.util.concurrent.ConcurrentHashMap;
 2 import java.util.concurrent.ConcurrentMap;
 3 import java.util.concurrent.DelayQueue;
 4 import java.util.concurrent.TimeUnit;
 5 import java.util.logging.Level;
 6 import java.util.logging.Logger;
 7 
 8 public class Cache<K, V> {
 9     private static final Logger LOG = Logger.getLogger(Cache.class.getName());
10 
11     private ConcurrentMap<K, V> cacheObjMap = new ConcurrentHashMap<K, V>();
12 
13     private DelayQueue<DelayItem<Pair<K, V>>> q = new DelayQueue<DelayItem<Pair<K, V>>>();
14 
15     private Thread daemonThread;
16 
17     public Cache() {
18 
19         Runnable daemonTask = new Runnable() {
20             public void run() {
21                 daemonCheck();
22             }
23         };
24 
25         daemonThread = new Thread(daemonTask);
26         daemonThread.setDaemon(true);
27         daemonThread.setName("Cache Daemon");
28         daemonThread.start();
29     }
30 
31     private void daemonCheck() {
32 
33         if (LOG.isLoggable(Level.INFO))
34             LOG.info("cache service started.");
35 
36         for (;;) {
37             try {
38                 DelayItem<Pair<K, V>> delayItem = q.take();
39                 if (delayItem != null) {
40                     // 超时对象处理
41                     Pair<K, V> pair = delayItem.getItem();
42                     cacheObjMap.remove(pair.first, pair.second); // compare and remove
43                 }
44             } catch (InterruptedException e) {
45                 if (LOG.isLoggable(Level.SEVERE))
46                     LOG.log(Level.SEVERE, e.getMessage(), e);
47                 break;
48             }
49         }
50 
51         if (LOG.isLoggable(Level.INFO))
52             LOG.info("cache service stopped.");
53     }
54 
55     // 添加缓存对象
56     public void put(K key, V value, long time, TimeUnit unit) {
57         V oldValue = cacheObjMap.put(key, value);
58         if (oldValue != null)
59             q.remove(key);
60 
61         long nanoTime = TimeUnit.NANOSECONDS.convert(time, unit);
62         q.put(new DelayItem<Pair<K, V>>(new Pair<K, V>(key, value), nanoTime));
63     }
64 
65     public V get(K key) {
66         return cacheObjMap.get(key);
67     }
68 
69     // 测试入口函数
70     public static void main(String[] args) throws Exception {
71         Cache<Integer, String> cache = new Cache<Integer, String>();
72         cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
73 
74         Thread.sleep(1000 * 2);
75         {
76             String str = cache.get(1);
77             System.out.println(str);
78         }
79 
80         Thread.sleep(1000 * 2);
81         {
82             String str = cache.get(1);
83             System.out.println(str);
84         }
85     }
86 }

运行Sample,main函数执行的结果是输出两行,第一行为aaa,第二行为null。

原文地址:https://www.cnblogs.com/wubingshenyin/p/4497346.html