Jstorm TimeCacheMap源代码分析

/*** Eclipse Class Decompiler plugin, copyright (c) 2016 Chen Chao (cnfree2000@hotmail.com) ***/
package com.alibaba.jstorm.utils;

import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

public class TimeCacheMap<K, V> implements TimeOutMap<K, V> {
	private static final int DEFAULT_NUM_BUCKETS = 3;
	private LinkedList<HashMap<K, V>> _buckets;
	private final Object _lock;
	private Thread _cleaner;
	private ExpiredCallback _callback;

	public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
		this._lock = new Object();

		if (numBuckets < 2) {
			throw new IllegalArgumentException("numBuckets must be >= 2");
		}
		this._buckets = new LinkedList();
		for (int i = 0; i < numBuckets; ++i) {
			this._buckets.add(new HashMap());
		}

		this._callback = callback;
		long expirationMillis = expirationSecs * 1000L;
		long sleepTime = expirationMillis / (numBuckets - 1);
		this._cleaner = new Thread(new Runnable(sleepTime) {
			public void run() {
				while (!(AsyncLoopRunnable.getShutdown().get())) {
					Map dead = null;
					JStormUtils.sleepMs(this.val$sleepTime);
					synchronized (TimeCacheMap.this._lock) {
						dead = (Map) TimeCacheMap.this._buckets.removeLast();
						TimeCacheMap.this._buckets.addFirst(new HashMap());
					}
					if (TimeCacheMap.this._callback != null)
						for (Map.Entry entry : dead.entrySet())
							TimeCacheMap.this._callback.expire(entry.getKey(), entry.getValue());
				}
			}
		});
		this._cleaner.setDaemon(true);
		this._cleaner.start();
	}

	public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
		this(expirationSecs, 3, callback);
	}

	public TimeCacheMap(int expirationSecs) {
		this(expirationSecs, 3);
	}

	public TimeCacheMap(int expirationSecs, int numBuckets) {
		this(expirationSecs, numBuckets, null);
	}

	public boolean containsKey(K key) {
		synchronized (this._lock) {
			for (HashMap bucket : this._buckets) {
				if (bucket.containsKey(key)) {
					return true;
				}
			}
			return false;
		}
	}

	public V get(K key) {
		synchronized (this._lock) {
			for (HashMap bucket : this._buckets) {
				if (bucket.containsKey(key)) {
					return bucket.get(key);
				}
			}
			return null;
		}
	}

	public void putHead(K key, V value) {
		synchronized (this._lock) {
			((HashMap) this._buckets.getFirst()).put(key, value);
		}
	}

	public void put(K key, V value) {
		synchronized (this._lock) {
			Iterator it = this._buckets.iterator();
			HashMap bucket = (HashMap) it.next();
			bucket.put(key, value);
			while (it.hasNext()) {
				bucket = (HashMap) it.next();
				bucket.remove(key);
			}
		}
	}

	public Object remove(K key) {
		synchronized (this._lock) {
			for (HashMap bucket : this._buckets) {
				if (bucket.containsKey(key)) {
					return bucket.remove(key);
				}
			}
			return null;
		}
	}

	public int size() {
		synchronized (this._lock) {
			int size = 0;
			for (HashMap bucket : this._buckets) {
				size += bucket.size();
			}
			return size;
		}
	}

	public void cleanup() {
		this._cleaner.interrupt();
	}

	public Map<K, V> buildMap() {
		Map ret = new HashMap();
		synchronized (this._lock) {
			for (HashMap bucket : this._buckets) {
				ret.putAll(bucket);
			}
			return ret;
		}
	}
}

  总体思路,linkList下默认带有3个hashmap,每次新加数据加到第一个hashmap内,同时删除后面map同样key的数据,里面一个线程定时清理过期数据,sleep后,删除list最后一个hashmap,新建一个空的hashmap放到linklist第一个的位置,下一个时间窗口添加数据就添加到该hashmap内,原有的第一个hashmap变为第二个,原有的第二个变为第三个,下次删除就清除最后一个hashmap, 依次循环。

sleep时间 如果默认为30秒参数, 根据代码公式,计算窗口移动时间为15秒, 第一个窗口最后一秒添加数据,30秒后删除,如果是第一个窗口第一秒添加,则需要45秒后删除

long expirationMillis = expirationSecs * 1000L;
long sleepTime = expirationMillis / (numBuckets - 1);

取数据则是遍历list下所有hashmap拿取数据

原文地址:https://www.cnblogs.com/chengxin1982/p/6425522.html