背景介绍

我们使用SpringCache框架 + Redis来实现项目中的缓存实现,它能实现自动对数据缓存,也可以自动清理过期的缓存。大多数情况下,它都运行非常好。

这是因为我们需要缓存的数据,通常都是可序列化的,但是我们迟早会遇到不可序列化的对象。那么我们只能选择SpringCache中的ConcurrentMapCache才能缓存这些不可序列化的对象,但是ConcurrentMapCache呢又不提供自动清理缓存的功能。

于是我开始自己设计一个本地的、高效的、能自动清理缓存扩展,同样它能支持SpringCache。

为了高效的清理缓存,我采用分桶策略,这一设计思想来源于ZooKeeper的Session管理。分桶策略也是本文的精彩内容。

SpringCache的使用

SpringCache + Redis自动清理缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@EnableCaching
@Configuration
public class RedisCacheAutoConfiguration {

@Autowired
private RedisConnectionFactory redisConnectionFactory;

@Primary
@Bean("redisCacheManager")
public CacheManager cacheManager() {
RedisCacheManager cacheManager = new RedisCacheManager(
RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory),
getTtlRedisCacheConfiguration(CacheNameEnum.DEFAULT),
getCustomizeTtlRedisCacheConfigurationMap());
return cacheManager;
}


private Map<String, RedisCacheConfiguration> getCustomizeTtlRedisCacheConfigurationMap() {
Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>();
for (CacheNameEnum cacheNameEnum : CacheNameEnum.values()) {
redisCacheConfigurationMap.put(cacheNameEnum.name(), getTtlRedisCacheConfiguration(cacheNameEnum));
}
return redisCacheConfigurationMap;
}

private RedisCacheConfiguration getTtlRedisCacheConfiguration(CacheNameEnum cacheNameEnum) {
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

RedisSerializationContext.SerializationPair<Object> objectSerializationPair = RedisSerializationContext.SerializationPair.fromSerializer(fastJsonRedisSerializer);
RedisSerializationContext.SerializationPair<String> stringSerializationPair = RedisSerializationContext.SerializationPair.fromSerializer(stringRedisSerializer);

RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig();
redisCacheConfiguration = redisCacheConfiguration.serializeKeysWith(stringSerializationPair)
.serializeValuesWith(objectSerializationPair)
.entryTtl(Duration.ofSeconds(cacheNameEnum.getTtl()));
return redisCacheConfiguration;
}

enum CacheNameEnum {
DEFAULT(60);
private int ttl;

CacheNameEnum(int ttl) {
this.ttl = ttl;
}

public int getTtl() {
return ttl;
}
}
}

那么使用的时候,就只需要增加注解就行了

1
2
3
4
 @Cacheable(cacheManager = "redisCacheManager", cacheNames = "DEFAULT", key = "'nft:transafer:' + #mnemonic")
public Transafer recover(String mnemonic) {
return new Transafer(mnemonic, wenchangChainPropertity);
}

SpringCache + Map本地缓存

1
2
3
4
5
6
7
8
9
10
11
// 记得加上@EnableCaching,开启缓存
@Bean("localCacheManager")
public CacheManager localCacheManager() {
ConcurrentMapCache publicKeyCache = new ConcurrentMapCache("localCache");
Set<Cache> caches = new HashSet<>();
caches.add(publicKeyCache);

SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(caches);
return cacheManager;
}

那么使用的时候,就只需要增加注解就行了

1
2
3
4
 @Cacheable(cacheManager = "localCacheManager", cacheNames = "localCache", key = "'nft:transafer:' + #mnemonic")
public Transafer recover(String mnemonic) {
return new Transafer(mnemonic, wenchangChainPropertity);
}

SpringCache + Map自动清理本地缓存

为了实现自动清理缓存,我继承了ConcurrentMapCache,采用分桶策略,定时清理。

  • expirationInterval,桶的估计范围,如果为1分钟,那么1分钟内创建的缓存都存在一个桶,例如16:11:20和16:11:01,都会存放在16:12:00这个桶中。
  • roundToNextInterval,用于根据当前时间计算,下一个桶的时间。
  • executorService,用于清理缓存,仅仅在创建桶时,调用其该线程,并不会实时运行,占用CPU资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class LocalExpiryCache extends ConcurrentMapCache {
private static Logger log = LoggerFactory.getLogger(LocalExpiryCache.class);
/**
* 桶的范围
*/
public final long expirationInterval;
private static ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, NftThreadFactory.create("cache-cleara", true));
private static final Map<Long, Set<Object>> expiryMap = new ConcurrentHashMap<>();

public LocalExpiryCache(String name, long expirationInterval) {
super(name);
this.expirationInterval = expirationInterval;
}


@Override
public void put(Object key, Object value) {
log.info("=======put=======");
super.put(key, value);
long now = System.currentTimeMillis();
long expires = roundToNextInterval(now);
log.info("expires: " + DateUtil.formatDate(new Date(expires), DateUtil.FORMAT_DATETIME_NORMAL));
if (!expiryMap.containsKey(expires)) {
synchronized (this) {
if (!expiryMap.containsKey(expires)) {
expiryMap.put(expires, new ConcurrentHashSet<>());
executorService.schedule((Runnable) this::expiry, expires - now + 100
, TimeUnit.MILLISECONDS);
}
}
}
Set<Object> objects = expiryMap.get(expires);
objects.add(key);
}

@Override
public ValueWrapper putIfAbsent(Object key, Object value) {
log.info("=======putIfAbsent=======");
return super.putIfAbsent(key, value);
}

private long roundToNextInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}

public Set expiry() {
log.info("-------------------------------------");
long now = System.currentTimeMillis();
Set<Long> ttls = expiryMap.keySet();
if (CollectionUtils.isEmpty(ttls)) {
return Collections.emptySet();
}
Iterator<Long> iterator = ttls.iterator();
Set result = new HashSet();
while (iterator.hasNext()) {
Long expirationTime = iterator.next();
if (now < expirationTime) {
break;
}
result.addAll(expiryMap.get(expirationTime));
iterator.remove();
}
for (Object key : result) {
super.evict(key);
}
log.info("evict size: " + result.size());
return result;
}

public static void main(String[] args) throws Exception {

LocalExpiryCache localCache = new LocalExpiryCache("", 1 * 60 * 1000);
localCache.put("1", "");
localCache.put("2", "");

log.info(localCache.getNativeCache().size() + "");
Thread.sleep(1 * 60 * 1000);
log.info(localCache.getNativeCache().size() + "");

localCache.put("2", "");
Thread.sleep(1 * 60 * 1000);
log.info(localCache.getNativeCache().size() + "");

System.in.read();
}
}

使用时,用LocalExpiryCache替换掉ConcurrentMapCache即可

1
2
3
4
5
6
7
8
9
10
@Bean("localCacheManager")
public CacheManager localCacheManager() {
LocalExpiryCache publicKeyCache = new LocalExpiryCache("localCache", 1 * 60 * 1000);
Set<Cache> caches = new HashSet<>();
caches.add(publicKeyCache);

SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(caches);
return cacheManager;
}