jiaji's blog

Guava Cache

Guava Cache简介

Local Cache是比较常用的后端开发“组件”,Guava中的Cache是一个很实用的Local Cache的实现,它支持下列特性:

  • automatic loading of entries into the cache
  • least-recently-used eviction when a maximum size is exceeded
  • time-based expiration of entries, measured since last access or last write
  • keys automatically wrapped in weak references
  • values automatically wrapped in weak or soft references
  • notification of evicted (or otherwise removed) entries
  • accumulation of cache access statistics

这些特性都是可选的,可在初始化时的建造者模式中进行配置。
今天主要根据源码来讲一下Guava Cache(19.0)的实现,具体的应用方法可以参考官方文档

源码分析

“过期”策略介绍

在讲源码之前首先介绍两个常用的过期策略:expireAfterWrite和refreshAfterWrite。这两个策略都能保证在本地缓存过期时,只有一个线程去加载后端资源(远端缓存或者db)。不同的是,在加载资源时,expireAfterWrite会让所有的线程阻塞等待新值返回,然后返回加载好的新值;而refreshAfterWrite在一个线程去拿新值的同时,其他线程先直接返回旧值,不阻塞。

get方法主要实现了上述逻辑。

get分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// loading
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
//count是这个segment中“存活”的元素数量,第一次命中时为0
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
//获取有效的,且没有过期的值
V value = getLiveValue(e, now);
if (value != null) {
//CacheStats打点记录
recordRead(e, now);
statsCounter.recordHits(1);
//refresh的逻辑,不阻塞先返回旧值
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
//已经有一个线程去load了,这里同步等待返回的新值
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
//拿锁并返回已经存在的值,或者进行同步的load
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}

get中的scheduleRefresh:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
V scheduleRefresh(ReferenceEntry<K, V> entry, K key, int hash, V oldValue, long now,
CacheLoader<? super K, V> loader) {
if (map.refreshes() && (now - entry.getWriteTime() > map.refreshNanos)
&& !entry.getValueReference().isLoading()) {
//如果没有线程在loading,则去load新值
V newValue = refresh(key, hash, loader, true);
if (newValue != null) {
return newValue;
}
}
//否则直接返回旧值
return oldValue;
}

get中的lockedGetOrLoad:
一般第一次加载某个key对应的value,或者expireAfterWrite策略中value过期时会进入到这个函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//找到元素判断是否过期,返回或者执行清理
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
//新建的元素,先做一些初始化的动作
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
//新建的元素,同步去远端load值
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
//否则阻塞等待别的线程load
return waitForLoadingValue(e, key, valueReference);
}
}

get中的waitForLoadingValue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
throws ExecutionException {
if (!valueReference.isLoading()) {
throw new AssertionError();
}
checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
// don't consider expiration as we're concurrent with loading
try {
//getUninterruptibly同步获取
V value = valueReference.waitForValue();
if (value == null) {
throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
}
// re-read ticker now that loading has completed
long now = map.ticker.read();
recordRead(e, now);
return value;
} finally {
statsCounter.recordMisses(1);
}
}

这里需要注意,如果在guava cache中,去远端load回结果为null,这时不会返回null,而是直接抛异常InvalidCacheLoadException出来。在db操作中,load结果为null是很常见的,所以一定要注意处理这点。

目前的解决办法是,拿到db返回结果时先判断下是否为null,如果为null则返回一个空对象出去。外边函数拿到get的结果后先取这个对象的一个属性,比如一个DO的id,判断是否为null,如果为null则返回null,否则返回对象本身。

思考和总结

从代码中可以看出,Guava Cache不管是哪种过期策略,都是触发式的:即每一次的过期和reload动作都依赖外部的请求。所谓的refresh并不是自己起了一个线程去不停的reload。在getLiveValue方法中判断过期元素也不包括refresh策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean isExpired(ReferenceEntry<K, V> entry, long now) {
checkNotNull(entry);
if (expiresAfterAccess()
&& (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
return true;
}
if (expiresAfterWrite()
&& (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
return true;
}
return false;
}

这样的话,如果用了refreshAfterWrite,在load完一个值之后,过了很久同时来了一拨并发请求,那么大量请求都会拿到旧的值,可能会导致业务上的问题。不过这种场景也是有点略奇葩,不太常见。所以在使用各个特性时一定要结合自己的业务,如果不确定最好先写个小的demo试一下。最后再提一下,expireAfterWrite和refreshAfterWrite是可以一起使用的,官方文档中也有说明:

In contrast to expireAfterWrite, refreshAfterWrite will make a key eligible for refresh after the specified duration, but a refresh will only be actually initiated when the entry is queried. (If CacheLoader.reload is implemented to be asynchronous, then the query will not be slowed down by the refresh.) So, for example, you can specify both refreshAfterWrite and expireAfterWrite on the same cache, so that the expiration timer on an entry isn’t blindly reset whenever an entry becomes eligible for a refresh, so if an entry isn’t queried after it comes eligible for refreshing, it is allowed to expire.

Guava Cache的设计是很好的,但是感觉实现上不够“优雅”,代码读起来有点难受,整个学习过程是猜测->验证->明白。如果有问题欢迎讨论交流~