问题遇到的现象和发生背景
应用系统可以通过配置支持开启ehcache缓存或redis缓存,使用spring cache的缓存管理器去实现。redis cache应用场景是业务系统做集群,问题是使用redis cache时系统中大量这样的代码(仅示例代码,没有这么low,实际是带有业务场景的频繁请求redis):
@Cacheable(key = "#root.caches[0].name + #corpId", unless = "#result == null")
public Corp getCorpById(String corpId) {
return corpDao.getCorp(corpId);
}
public List<Corp> getCorpByIds(List<String> corpIds) {
List<Corp> corps = new ArrayList<>();
corpIds.forEach(corpId -> corps.add(getCorpById(corpId)));
return corps;
}
当调用getCorpByIds
方法时会频繁请求redis造成响应很慢
我的解答思路和尝试过的方法
我自己封装了一个支持pipeline请求的方法,整体逻辑是调用这个getBatch
时优先以pipeline方式去redis获取缓存,redis中没有的部门会去批量查库,最后再将查到的结果调用putBatch
以pipeline的方式存入redis,代码如下:
/**
* 批量获取缓存数据, 如不存在则通过 valueLoader 获取数据, 并存入缓存中
* 如果缓存中存在 null,则视为不存在,仍然通过 valueLoader 加载,如需要防止缓存穿透,建议存入空对象,而非 null
*
* @param <K> key 的类型
* @param <V> value 的类型
* @param keys key
* @param valueLoader 数据加载器
* @param keyMapper 根据value获取key 映射器
* @param vClass 返回数据类型
* @param isListValue value是否为list类型,即一个key对应一个List<V>
* @param prefix 缓存前缀
* @return 缓存列表
*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <K, V> List<V> getBatch(List<K> keys, Function<List<K>, Collection<V>> valueLoader, Function<V, K> keyMapper,
Class<V> vClass, boolean isListValue, String prefix) {
Objects.requireNonNull(redisOperations, "redisOperations required not null");
List resultList = Collections.emptyList();
try {
resultList = redisOperations.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer keySerializer = redisOperations.getKeySerializer();
connection.openPipeline();
for (K k : keys) {
byte[] key = keySerializer.serialize(keyPrefix + prefix + k.toString());
if (key != null) {
connection.get(key);
} else {
log.warn("CustomizedRedisCache 批量操作序列化失败,key={}", k);
}
}
return null;
});
} catch (Exception e) {
log.error("CustomizedRedisCache 异常", e);
}
int keysSize = keys.size();
// 筛选出缓存中不存在的key
List<K> dbKeys = new ArrayList<>(keysSize);
List<V> values = new ArrayList<>();
if (CollectionUtils.isEmpty(resultList)) {
dbKeys.addAll(keys);
} else {
for (int i = 0; i < resultList.size(); i++) {
Object o = resultList.get(i);
if (o == null) {
dbKeys.add(keys.get(i));
continue;
}
if (o instanceof NullObject) {
continue;
}
if (isListValue) {
values.addAll((Collection<V>)o);
continue;
}
values.add((V) o);
}
}
// 缓存中没有就从持久层中查询(需要注意分批次查询)
if (!CollectionUtils.isEmpty(dbKeys)) {
Collection<V> dbValue = valueLoader.apply(dbKeys);
Map dbMap;
if (isListValue) {
dbMap = dbValue.stream().filter(Objects::nonNull).collect(Collectors.groupingBy(keyMapper));
} else {
dbMap = dbValue.stream().filter(Objects::nonNull).collect(Collectors.toMap(keyMapper, Function.identity()));
}
for(K key : dbKeys){
if(dbMap.containsKey(key)){
continue;
}
dbMap.put(key, new NullObject());
}
putBatch(dbMap, prefix);
values.addAll(dbValue);
}
return values;
}
/**
* 批量存入缓存
*
* @param map 需要存入的数据
* @param <K> 数据的 key 的类型
* @param <V> 数据的 value 的类型
* @param prefix 缓存前缀
*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <K, V> void putBatch(Map<K, V> map, String prefix) {
if (map.isEmpty()) {
return;
}
Objects.requireNonNull(redisOperations, "redisTemplate required not null");
try {
redisOperations.executePipelined((RedisCallback<Object>) connection -> {
RedisSerializer keySerializer = redisOperations.getKeySerializer();
RedisSerializer valueSerializer = redisOperations.getValueSerializer();
connection.openPipeline();
for (Map.Entry<K, V> entry : map.entrySet()) {
byte[] key = keySerializer.serialize(keyPrefix + prefix + entry.getKey().toString());
byte[] value = valueSerializer.serialize(entry.getValue());
if (key != null) {
connection.set(key, value);
} else {
log.warn("CustomizedRedisCache 批量操作序列化失败,entry={}", entry);
}
}
return null;
});
} catch (Exception e) {
log.error("CustomizedRedisCache 异常", e);
}
}
/**
* 批量删除缓存
*
* @param keys 需要传入的删除的缓存key集合 map key:cache name,map value:要删除的key集合,*删除所有
*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void deleteBatch(Map<String, Set<String>> keys) {
Objects.requireNonNull(redisOperations, "redisTemplate required not null");
try {
RedisSerializer keySerializer = redisOperations.getKeySerializer();
// 查出目录下所有的key(redis异步,不支持pipeline)
redisOperations.execute((RedisCallback<Object>) connection -> {
keys.entrySet().forEach(e -> {
if(e.getValue().contains("*")){
ScanOptions options = ScanOptions.scanOptions().match(getKeyPrefix(e.getKey()) + "*").count(Integer.MAX_VALUE).build();
Cursor cursor = connection.scan(options);
while (cursor.hasNext()) {
connection.del((byte[]) cursor.next());
}
} else {
e.getValue().forEach( k -> {
connection.del(keySerializer.serialize(getKeyPrefix(e.getKey()) + k));
});
}
});
return null;
});
} catch (Exception e) {
log.error("CustomizedRedisCache 异常", e);
}
}
/**
* 用于防止缓存穿透,建议存入这个空对象,而非 null,后期可集成布隆过滤器或布谷鸟过滤器
*/
public static class NullObject {
}
这里有个明显的问题,要调用这个方法还必须传入根据value获取key的映射方法用于查库之后存入redis,但有很多业务数据只能根据key查出来value,没办法根据value查key
我想要达到的结果
社区同仁们有什么更优雅的解决方案吗?感谢!