Fun肆编程 2021-12-31 13:50 采纳率: 0%
浏览 57
已结题

应用系统使用SpringCache的缓存管理器如何正确使用redis的pipeline解决一次请求过程中频繁连接redis的问题

问题遇到的现象和发生背景

应用系统可以通过配置支持开启ehcache缓存或redis缓存,使用spring cache的缓存管理器去实现。redis cache应用场景是业务系统做集群,问题是使用redis cache时系统中大量这样的代码(仅示例代码,没有这么low,实际是带有业务场景的频繁请求redis):

    @Cacheable(key = "#root.caches[0].name + #corpId", unless = "#result == null")
    public Corp getCorpById(String corpId) {
        return corpDao.getCorp(corpId);
    }

    public List<Corp> getCorpByIds(List<String> corpIds) {
        List<Corp> corps = new ArrayList<>();
        corpIds.forEach(corpId -> corps.add(getCorpById(corpId)));
        return corps;
    }

当调用getCorpByIds方法时会频繁请求redis造成响应很慢

我的解答思路和尝试过的方法

我自己封装了一个支持pipeline请求的方法,整体逻辑是调用这个getBatch时优先以pipeline方式去redis获取缓存,redis中没有的部门会去批量查库,最后再将查到的结果调用putBatch以pipeline的方式存入redis,代码如下:

    /**
     * 批量获取缓存数据, 如不存在则通过 valueLoader 获取数据, 并存入缓存中
     * 如果缓存中存在 null,则视为不存在,仍然通过 valueLoader 加载,如需要防止缓存穿透,建议存入空对象,而非 null
     *
     * @param <K>         key 的类型
     * @param <V>         value 的类型
     * @param keys        key
     * @param valueLoader 数据加载器
     * @param keyMapper   根据value获取key 映射器
     * @param vClass      返回数据类型
     * @param isListValue value是否为list类型,即一个key对应一个List<V>
     * @param prefix      缓存前缀
     * @return 缓存列表
     */
    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public <K, V> List<V> getBatch(List<K> keys, Function<List<K>, Collection<V>> valueLoader, Function<V, K> keyMapper,
                                   Class<V> vClass, boolean isListValue, String prefix) {
        Objects.requireNonNull(redisOperations, "redisOperations required not null");
        List resultList = Collections.emptyList();
        try {
            resultList = redisOperations.executePipelined((RedisCallback<Object>) connection -> {
                RedisSerializer keySerializer = redisOperations.getKeySerializer();
                connection.openPipeline();
                for (K k : keys) {
                    byte[] key = keySerializer.serialize(keyPrefix + prefix + k.toString());
                    if (key !=  null) {
                        connection.get(key);
                    } else {
                        log.warn("CustomizedRedisCache 批量操作序列化失败,key={}", k);
                    }
                }
                return null;
            });
        } catch (Exception e) {
            log.error("CustomizedRedisCache 异常", e);
        }

        int keysSize = keys.size();
        // 筛选出缓存中不存在的key
        List<K> dbKeys = new ArrayList<>(keysSize);
        List<V> values = new ArrayList<>();
        if (CollectionUtils.isEmpty(resultList)) {
            dbKeys.addAll(keys);
        } else {
            for (int i = 0; i < resultList.size(); i++) {
                Object o = resultList.get(i);
                if (o == null) {
                    dbKeys.add(keys.get(i));
                    continue;
                }
                if (o instanceof NullObject) {
                    continue;
                }
                if (isListValue) {
                    values.addAll((Collection<V>)o);
                    continue;
                }
                values.add((V) o);
            }
        }
        // 缓存中没有就从持久层中查询(需要注意分批次查询)
        if (!CollectionUtils.isEmpty(dbKeys)) {
            Collection<V> dbValue = valueLoader.apply(dbKeys);
            Map dbMap;
            if (isListValue) {
                dbMap = dbValue.stream().filter(Objects::nonNull).collect(Collectors.groupingBy(keyMapper));
            } else {
                dbMap = dbValue.stream().filter(Objects::nonNull).collect(Collectors.toMap(keyMapper, Function.identity()));
            }
            for(K key : dbKeys){
                if(dbMap.containsKey(key)){
                    continue;
                }
                dbMap.put(key, new NullObject());
            }
            putBatch(dbMap, prefix);
            values.addAll(dbValue);
        }
        return values;
    }

    /**
     * 批量存入缓存
     *
     * @param map    需要存入的数据
     * @param <K>    数据的 key 的类型
     * @param <V>    数据的 value 的类型
     * @param prefix 缓存前缀
     */
    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public <K, V> void putBatch(Map<K, V> map, String prefix) {
        if (map.isEmpty()) {
            return;
        }
        Objects.requireNonNull(redisOperations, "redisTemplate required not null");
        try {
            redisOperations.executePipelined((RedisCallback<Object>) connection -> {
                RedisSerializer keySerializer = redisOperations.getKeySerializer();
                RedisSerializer valueSerializer = redisOperations.getValueSerializer();
                connection.openPipeline();
                for (Map.Entry<K, V> entry : map.entrySet()) {
                    byte[] key = keySerializer.serialize(keyPrefix + prefix + entry.getKey().toString());
                    byte[] value = valueSerializer.serialize(entry.getValue());
                    if (key != null) {
                        connection.set(key, value);
                    } else {
                        log.warn("CustomizedRedisCache 批量操作序列化失败,entry={}", entry);
                    }
                }
                return null;
            });
        } catch (Exception e) {
            log.error("CustomizedRedisCache 异常", e);
        }
    }

    /**
     * 批量删除缓存
     *
     * @param keys 需要传入的删除的缓存key集合 map key:cache name,map value:要删除的key集合,*删除所有
     */
    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public void deleteBatch(Map<String, Set<String>> keys) {
        Objects.requireNonNull(redisOperations, "redisTemplate required not null");
        try {
            RedisSerializer keySerializer = redisOperations.getKeySerializer();
            // 查出目录下所有的key(redis异步,不支持pipeline)
            redisOperations.execute((RedisCallback<Object>) connection -> {
                keys.entrySet().forEach(e -> {

                    if(e.getValue().contains("*")){

                        ScanOptions options = ScanOptions.scanOptions().match(getKeyPrefix(e.getKey()) + "*").count(Integer.MAX_VALUE).build();
                        Cursor cursor = connection.scan(options);
                        while (cursor.hasNext()) {
                            connection.del((byte[]) cursor.next());
                        }
                    } else {
                        e.getValue().forEach( k -> {
                            connection.del(keySerializer.serialize(getKeyPrefix(e.getKey()) + k));
                        });
                    }
                });
                return null;
            });
        } catch (Exception e) {
            log.error("CustomizedRedisCache 异常", e);
        }
    }

    /**
     * 用于防止缓存穿透,建议存入这个空对象,而非 null,后期可集成布隆过滤器或布谷鸟过滤器
     */
    public static class NullObject {
    }

这里有个明显的问题,要调用这个方法还必须传入根据value获取key的映射方法用于查库之后存入redis,但有很多业务数据只能根据key查出来value,没办法根据value查key

我想要达到的结果

社区同仁们有什么更优雅的解决方案吗?感谢!

  • 写回答

1条回答 默认 最新

  • 有问必答小助手 2022-01-04 10:05
    关注

    你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答


    本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。


    因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。

    评论

报告相同问题?

问题事件

  • 系统已结题 1月8日
  • 创建了问题 12月31日

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog