一个改进 LRU 算法的缓冲池 update 2013.7.15

首先,这里的缓冲池指的是 Cache,而不是 Buffer,就是指将代价较大的对象先存储起来,以备以后需要的时候可以直接拿来用,能够节约一些时间或空间。

当缓冲池中的对象过多时,就需要删掉一些“不会再用”的对象来节约内存。但是没人能够知道某个对象什么时候会再用,因此这就涉及到缓存替换算法了,好的缓存替换算法可以有更大的概率删掉“不会再用”的对象,能够保留“很可能再用”的对象。

现在发明的缓存替换算法有很多,除了只有理论意义的 OPT(Optimal,最优替换,也就是上帝模式)和听天由命的随机替换,常用的有 LRU(Least recently used,最近最少使用),LFU(Least Frequently Used,最不经常使用)和 FIFO(First in first out,先进先出)以及很多改进算法。其中 LRU 算法经常被使用,我也打算使用 LRU 算法来实现一个缓冲池。

一、改进的 LRU 算法

LRU 算法认为最近被访问的对象,以后也很可能被访问。因此,在实现时,一般使用一个哈希表类存储被缓存的对象,和一个双向链表类存储对象被使用的情况:如果一个对象被访问了,就将它提升到链表的头部,如图1所示,这样就能保证最近被访问的对象总是更接近头部。当添加新对象时,直接添加到链表的头部;当需要淘汰旧对象时,就从链表的尾部(最久没有被访问)开始淘汰。

图1 K 被访问了,提升到链表的头部。

LRU 算法的实现还是比较容易的,但是由于每个操作都会有链表的修改,因此在并发访问时都每个操作需要加锁(双向链表没有无锁版的,只有单向链表有),不太利于并发访问。

同时,LRU 只将对象“最近是否被访问”作为度量指标,并不准确,偶发性和周期性的操作会导致 LRU 的命中率急剧下降,因此也提出了很多 LRU 的改进算法,例如 LRU-K,2Q,以及命中率更高的 LIRS 算法(它的论文是这么说的)。

后来,无意间看到了《OCP知识点讲解 之 LRU链与脏LRU链》这篇文章,里面介绍了一种 Oracle 改进的 LRU 算法,感觉效果会不错,实现起来也比较简单,因此使用该算法来实现缓冲池。

这个改进的算法将 LRU 链从中间分成了两半,前一半是热端,后一半是冷端,如图2所示。热端记录的是访问次数大于等于两次的对象,它们被认为可能会被经常访问到,冷端记录的是访问次数为一次的对象。区分热端和冷端,就可以保留访问次数较大的对象,同时更快的淘汰掉只访问了一次的对象,使命中情况更好,2Q 等改进算法使用的也是类似的思想。如果热端和冷端并不平均分配,就可以更快或更慢的淘汰掉只访问了一次的对象。

图2 改进的 LRU 链表,图片来自文章《OCP知识点讲解 之 LRU链与脏LRU链》

改进算法中还为每个对象的添加了访问次数,这样可以将冷端被经常访问的对象添加到热端,热端的对象也会被“降级”到冷端。下面则是具体的实现分析,为了方便起见,无论是热端还是冷端,都将左边作为头,右边作为尾。

1. 添加新对象

  • 缓存空间还未满:与标准的 LRU 一样,总是插入到热端的头。因为此时缓存空间还足够,没必要区分对象是否被多次访问,之后在淘汰旧对象时自然可以区分出来。
  • 缓存空间已满:这时先淘汰旧的对象,再将新对象添加到冷端的头。因为新添加对象的访问次数总是 1,所以应当放到冷端。

2. 访问对象

这里直接递增对象的访问计数即可,因此这个算法的并发访问效率可以提高一些。

3. 淘汰旧对象

淘汰旧对象是从冷端的末尾开始的,但并不是顺序进行淘汰,而是如果冷端末尾的对象的访问次数大于等于2,就认为可能会被经常访问,则将它移动到热端的头,并将访问次数清零,同时热端的末尾被淘汰到冷端的头。然后继续尝试淘汰,直到对象的访问次数小于2。清零访问次数保证了一定可以找到被淘汰的对象。

在淘汰旧对象时,如果访问次数大于等于2,并不需要真的每次都移除末尾节点再将它移动到热端的头。因为双向链表可以以环形存储(System.Collections.Generic.LinkedList<T> 就是这么干的),也就是说 Head.Prev = Tail && Tail.Next = Head,所以这个移动过程就相当于将 head 变为 Head.Prev(也就是前移一位),如图3所示:淘汰从 H 开始,但是 H 的访问次数大于 2,就将 Head 从 A 移动到 H,H 自然就被添加到热端的头了,下一次尝试也就从 G 继续进行。

图3 尝试淘汰访问次数大于等于 2 的 H。

淘汰旧对象这里感觉很像 Clock 算法,不过 Clock 总是存储到刚被淘汰的位置,改进的 LRU 算法则是存储到冷端的头。

二、缓冲池接口定义

缓冲池接口的定义如下,是像 Dictioary 一样利用键来检索对象。在添加对象时,总是会覆盖旧对象;在检索时,由于必须要对对象的有效性进行判断(说不定什么时候就被替换掉了),因此提供的是 TryGet 方法而不是抛异常。为了方便使用缓冲池,加入了 GetOrAdd,这样就不用每次都自己判断对象是否存在了。

using System;

namespace Cyjb.Utility {
	/// <summary>
	/// 表示缓冲池的接口。
	/// </summary>
	/// <typeparam name="TKey">缓冲对象的键的类型。</typeparam>
	/// <typeparam name="TValue">缓冲对象的类型。</typeparam>
	public interface ICache<TKey, TValue> {
		/// <summary>
		/// 将指定的键和对象添加到缓存中,无论键是否存在。
		/// </summary>
		/// <param name="key">要添加的对象的键。</param>
		/// <param name="value">要添加的对象。</param>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 为 <c>null</c>。</exception>
		void Add(TKey key, TValue value);
		/// <summary>
		/// 清空缓存中的所有对象。
		/// </summary>
		void Clear();
		/// <summary>
		/// 确定缓存中是否包含指定的键。
		/// </summary>
		/// <param name="key">要在缓存中查找的键。</param>
		/// <returns>如果缓存中包含具有指定键的元素,则为 <c>true</c>;否则为 <c>false</c>。</returns>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 为 <c>null</c>。</exception>
		bool Contains(TKey key);
		/// <summary>
		/// 从缓存中获取与指定的键关联的对象,如果不存在则将对象添加到缓存中。
		/// </summary>
		/// <param name="key">要获取的对象的键。</param>
		/// <param name="valueFactory">用于为键生成对象的函数。</param>
		/// <returns>如果在缓存中找到该键,则为对应的对象;否则为 <paramref name="valueFactory"/> 返回的新对象。</returns>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 为 <c>null</c>。</exception>
		TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory);
		/// <summary>
		/// 从缓存中移除并返回具有指定键的对象。
		/// </summary>
		/// <param name="key">要移除并返回的对象的键。</param>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 为 <c>null</c>。</exception>
		void Remove(TKey key);
		/// <summary>
		/// 尝试从缓存中获取与指定的键关联的对象。
		/// </summary>
		/// <param name="key">要获取的对象的键。</param>
		/// <param name="value">此方法返回时,<paramref name="value"/> 包含缓存中具有指定键的对象;
		/// 如果操作失败,则包含默认值。</param>
		/// <returns>如果在缓存中找到该键,则为 <c>true</c>;否则为 <c>false</c>。</returns>
		/// <exception cref="System.ArgumentNullException"><paramref name="key"/> 为 <c>null</c>。</exception>
		bool TryGet(TKey key, out TValue value);
	}
}

三、缓冲池的实现

缓冲池中,使用 ReaderWriterLockSlim 读写锁来实现并发访问,自己实现了一个双向环形链表来将对象链起来。如果对象实现了 System.IDisposable 接口,会在移除或替换对象时自动调用 Dispose 方法,这里需要注意的是 Dispose 最好在锁之外调用,防止 Dispose 方法导致锁不能尽快被释放掉。

添加对象的核心代码是:

LruNode<TKey, TValue> node;
IDisposable disposable = null;
cacheLock.EnterWriteLock();
try {
	if (cacheDict.TryGetValue(key, out node)) {
		// 更新节点。
		node.Value = value;
		// 写锁互斥,这里不用 Interlocked。
		node.VisitCount++;
		return;
	} else {
		if (count < maxSize) {
			// 将节点添加到热端起始。
			node = new LruNode<TKey, TValue>(key, value);
			AddHotFirst(node);
			// 写锁互斥,这里不用 Interlocked。
			count++;
			if (count == hotSize + 1) {
				codeHead = head.Prev;
			}
			cacheDict.Add(key, node);
			return;
		} else {
			// 从冷端末尾尝试淘汰旧节点,将访问次数大于 1 的移动到热端的头。
			// 由于双向链表是环形存储的,就相当于将 head 前移。
			while (head.Prev.VisitCount >= 2) {
				// 清零访问计数。
				head.Prev.VisitCount = 0;
				head = head.Prev;
				codeHead = codeHead.Prev;
			}
			// 将 node 移除,并添加到冷端的头。
			node = head.Prev;
			disposable = node.Value as IDisposable;
			this.cacheDict.Remove(node.Key);
			this.Remove(node);
			// 这里直接重用旧节点。
			node.Key = key;
			node.Value = value;
			node.VisitCount = 1;
			this.AddCodeFirst(node);
			cacheDict.Add(key, node);
		}
	}
} finally {
	cacheLock.ExitWriteLock();
}
if (disposable != null) {
	disposable.Dispose();
}

可以看到,如果要添加的对象的键已存在,是直接替换旧对象的。双向链表的节点也都是重用的,所以没必要使用对象池(应该不会经常删除缓存项把……)。

在下面的 GetOrAdd 方法中,实际上就是分别调用了 TryGet 方法和 AddInternal 方法,这么做可能会导致不必要的对象创建,但是为了保证 valueFactory 不会长时间的占用写锁,因此只好这么做了。

public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
	ExceptionHelper.CheckArgumentNull(key, "key");
	ExceptionHelper.CheckArgumentNull(valueFactory, "valueFactory");
	TValue value;
	if (this.TryGet(key, out value))
	{
		return value;
	}
	value = valueFactory(key);
	this.AddInternal(key, value);
	return value;
}

2013.7.15 更新:这里可以利用 System.Lazy<T> 类来防止多次调用 valueFactory,即将对象用 Lazy<TValue> 包装起来,在获取对象时只会获取到 Lazy<TValue> 对象,实际对象的初始化则被 Lazy<TValue> 推迟了,同时也不会导致长时间占用写锁(只要得到了 Lazy<TValue> 对象就可以释放锁了),防止 valueFactory 多次被调用则由 Lazy<TValue> 完成。

缓冲池的实现基本就这样了,具体的代码可以见 Cyjb.Utility

四、命中率的测试

最后,稍微写了一段代码来测试算法的命中率,测试的四个算法分别是 OPT,改进的 LRU,普通的 LRU 和 FIFO。下面先附上测试代码:

int maxCacheSize = 200;
// 初始化数据。
int[] data = new int[100000];
Random random = new Random();
for (int i = 0; i < data.Length; i++) {
	if (random.Next(10) < 1) {
		data[i] = random.Next(10000);
	} else {
		data[i] = random.Next(10000);
	}
}

// OPT。
int miss = 0;
Dictionary<int, int> nextVisit = new Dictionary<int, int>();
HashSet<int> optCache = new HashSet<int>();
for (int i = 0; i < data.Length; i++) {
	if (!optCache.Contains(data[i])) {
		miss++;
		if (optCache.Count >= maxCacheSize) {
			int minI = 0, maxVisit = 0;
			foreach (int j in optCache) {
				if (!nextVisit.ContainsKey(j)) {
					nextVisit.Add(j, 0);
				}
				if (nextVisit[j] <= i) {
					nextVisit[j] = int.MaxValue;
					for (int k = i + 1; k < data.Length; k++) {
						if (data[k] == j) {
							nextVisit[j] = k;
							break;
						}
					}
				}
				if (nextVisit[j] > maxVisit) {
					maxVisit = nextVisit[j];
					minI = j;
					if (maxVisit == int.MaxValue) {
						break;
					}
				}
			}
			optCache.Remove(minI);
		}
		optCache.Add(data[i]);
	}
}
Console.WriteLine("OPT:{0}%", 100 - miss * 100.0 / data.Length);

// LRU。
miss = 0;
ICache<int, int> cache = new LruCache<int, int>(maxCacheSize);
for (int i = 0; i < data.Length; i++) {
	int value;
	if (!cache.TryGet(data[i], out value)) {
		cache.Add(data[i], data[i]);
		miss++;
	}
}
Console.WriteLine("LRU:{0}%", 100 - miss * 100.0 / data.Length);

// 普通 LRU。
miss = 0;
cache = new LruNormalCache<int, int>(maxCacheSize);
for (int i = 0; i < data.Length; i++) {
	int value;
	if (!cache.TryGet(data[i], out value))
	{
		cache.Add(data[i], data[i]);
		miss++;
	}
}
Console.WriteLine("LRU_Normal:{0}%", 100 - miss * 100.0 / data.Length);

// FIFO。
miss = 0;
Queue<int> queue = new Queue<int>();
HashSet<int> set = new HashSet<int>();
for (int i = 0; i < data.Length; i++) {
	if (!set.Contains(data[i])) {
		set.Add(data[i]);
		miss++;
		queue.Enqueue(data[i]);
		if (queue.Count > maxCacheSize) {
			set.Remove(queue.Dequeue());
		}
	}
}
Console.WriteLine("FIFO:{0}%", 100 - miss * 100.0 / data.Length);

下面是一些测试结果,缓冲池大小都是 200,数据个数是 10W,第一列表示数据的范围,50% 0~1000,50% 0~150 表示有 50% 的数据范围是 0~1000,50% 的数据范围是 0~150(这里给出的测试实际上不太严谨,数据范围应该是取 0~150 和 151~1000 的,现在的测试方法实际上有 57.5% 的数据范围是 0~150,42.5% 的数据范围是 151~1000)。表格中的数值表示命中率。

数据范围 OPT 改进LRU 普通LRU FIFO
50% 0~1000,50% 0~150 70.745% 50.966% 41.532% 37.325%
10% 0~1000,90% 0~150
93.851%
91.79% 89.941% 78.46%
10% 0~10000,90% 0~500 66.59% 34.056% 32.056% 31.342%
10% 0~10000,90% 0~5000 24.617% 3.705% 3.60599999999999% 3.605%
100% 0~10000 18.739% 2.04000000000001% 2.05500000000001% 2.063%

这些只是随意的测试,仅供参考,不过看起来改进的 LRU 还是比较给力的,可以比较好的保留经常被访问的数据。而当所有数据都是随机访问的时候,命中率反而会下降,甚至还不如 FIFO。

对于并发访问的测试,我不太了解怎么弄,所以就没有做,不过个人感觉 ReaderWriterLockSlim 应该就没有问题了。

原文地址:https://www.cnblogs.com/cyjb/p/LruCache.html