女汉子学者 2024-03-05 16:36 采纳率: 20%
浏览 20

关于使用redis stream 导致cpu高的问题!

使用redis stream 导致cpu高的问题


```java
@PostConstruct
    public void startListener() {
        String serviceName = environment.getProperty("spring.application.name");
        String ip = IdWorker.getIdStr();
//        if (!StringUtils.hasText(ip)) {
//            ip = ip + "_" + IdWorker.getIdStr();
//        }
        String port = environment.getProperty("server.port");
        groupId = String.format("%s_%s_%s", serviceName, ip, port);
        log.info("开启监听,消费组:{},groupId:{}", PARAM_GROUP_NAME, groupId);
        String consumer = "consumer-1";
//        Executor executor = Executors.newSingleThreadExecutor();
        ThreadPoolExecutor executor = ThreadPoolConfig.getInstance();
        container = MultiStreamContainer.builder()
                .stringRedisTemplate(stringRedisTemplate)
                .custom(consumer)
                .group(groupId)
                .stream(PARAM_GROUP_NAME)
                .executor(executor)
                .listener(message -> {
                    RecordId id = message.getId();
                    Map<Object, Object> value = message.getValue();
                    log.info("监听到消息stream:{} message:{}, id:{}", message.getStream(), value, id);

                    // 通过RedisTemplate手动确认消息
                    //消费消息,异步刷新本地缓存
                    ParamValueNewCaChe valueNewCaChe = SpringContextUtils.getBean(ParamValueNewCaChe.class);
                    String orgIdFlag = null;
                    if (ObjectUtils.isNotEmpty(value.get("OrgIdFlag"))) {
                        orgIdFlag = value.get("OrgIdFlag").toString().replace("\"", "");
                    }
                    String paramCache = value.get("ParamCache").toString().replace("\"", "");

                    //key 转换
                    if (StringUtils.hasText(orgIdFlag)) {
                        valueNewCaChe.refresh(paramCache, orgIdFlag);
                    } else {
                        valueNewCaChe.refresh(paramCache, null);
                    }
                    stringRedisTemplate.opsForStream().acknowledge(groupId, consumer, id);
                })
                .build();
        container.start();
    }


@Slf4j
@Builder
@Data
public class MultiStreamContainer {

    private StringRedisTemplate stringRedisTemplate;
    private String group;
    private String custom;
    private Executor executor;

    @Singular
    private List<String> streams;

    private Listener listener;

    private ListenerRunnable runnable;

    private static void prepareStreamAndGroup(StreamOperations<String, ?, ?> ops, String stream, String group) {
        String status = "OK";
        try {
            StreamInfo.XInfoGroups groups = ops.groups(stream);
            if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) {
                status = ops.createGroup(stream, group);
            }
        } catch (Exception exception) {
            RecordId initialRecord = ops.add(ObjectRecord.create(stream, "Initial Record"));
            Assert.notNull(initialRecord, "未初始化消费组 '" + stream + "'");
            status = ops.createGroup(stream, ReadOffset.from(initialRecord), group);
        } finally {
            Assert.isTrue("OK".equals(status), "无法创建组名为: '" + group + "'");
        }
    }

    public void stop() {
        if (runnable != null) {
            runnable.running.set(false);
            runnable = null;
        }
    }

    public void start() {
        stop();
        List<String> streamList = streams != null
                ? Lists.newArrayList(streams) : Lists.newArrayList();
        runnable = ListenerRunnable.builder()
                .streamList(streamList)
                .group(group)
                .custom(custom)
                .stringRedisTemplate(stringRedisTemplate)
                .listener(listener)
                .build();
        executor.execute(runnable);
    }

    public interface Listener {
        void onMessage(MapRecord<String, Object, Object> message);
    }

    @Builder
    @Data
    public static class ListenerRunnable implements Runnable {
        private final AtomicBoolean running = new AtomicBoolean(true);
        private final List<String> streamList;
        private final String group;
        private final String custom;
        private final StringRedisTemplate stringRedisTemplate;
        private final Listener listener;

        @Override
        public void run() {
            //1、初始化消费组
            List<StreamOffset<String>> offsetList = Lists.newArrayList();
            for (String readRecord : streamList) {
                offsetList.add(StreamOffset.create(readRecord, ReadOffset.lastConsumed()));
            }
            Consumer consumer = Consumer.from(group, custom);
            StreamReadOptions options = StreamReadOptions.empty().count(1);
            StreamOffset<String>[] offsets = new StreamOffset[offsetList.size()];
            for (int i = 0; i < offsetList.size(); i++) {
                offsets[i] = offsetList.get(i);
                prepareStreamAndGroup(stringRedisTemplate.opsForStream(), offsets[i].getKey(), group);
            }

            //2、监听中
            while (running.get()) {
                if (streamList.isEmpty()) {
                    log.info("不存在消息组!");
                    return;
                }
                List<MapRecord<String, Object, Object>> values = null;
                try {
                    values = stringRedisTemplate.opsForStream().read(consumer, options, offsets);
                    Thread.sleep(100);
                } catch (Exception exception) {
                    log.info("监听消息异常", exception);
                    try {
                        Thread.sleep(200);
                    } catch (Exception ee) {
                        ee.printStackTrace();
                    }
                }

                if (values != null && !values.isEmpty()) {
                    for (MapRecord val : values) {
                        listener.onMessage(val);
                    }
                }
            }
        }
    }
}


ublic class ThreadPoolConfig {
    /**
     * 核心线程数
     */
    private static final int corePoolSize = 1;
    /**
     * 最大线程数
     */
    private static final int maxPoolSize = 1;
    /**
     * 多余线程最大空闲时间
     */
    private static final int keepAlive = 20;
    /**
     * 线程池缓冲队列
     *  如果不手动指定容量,默认为Integer.MAX_VALUE,也就是无界队列。
     */
    private static final BlockingQueue poolQueue = new LinkedBlockingQueue(32);

    private static volatile ThreadPoolExecutor poolExecutor;

    private ThreadPoolConfig(){}

    public static ThreadPoolExecutor getInstance(){
        if (poolExecutor == null){
            synchronized (ThreadPoolConfig.class){
                if (poolExecutor == null){
                    poolExecutor = new ThreadPoolExecutor(corePoolSize,
                            maxPoolSize,
                            keepAlive,
                            TimeUnit.SECONDS,
                            poolQueue,
                            new ThreadPoolExecutor.DiscardOldestPolicy());
                }
            }
        }
        return poolExecutor;
    }
}

```

  • 写回答

4条回答 默认 最新

  • 关注

    女汉子学者 下午好🌅🌅🌅
    本答案参考ChatGPT-3.5

    使用Redis Stream导致CPU高的问题可能有多种原因。以下是可能的解决方案:

    1. 优化消费者代码:检查消费者代码中是否存在性能问题。可能存在的问题包括循环处理消息时的性能瓶颈、不必要的计算、数据库查询等。通过优化消费者代码,可以减少CPU的使用量。

    2. 调整消费者的线程数:根据实际情况调整消费者的线程数。如果当前线程数过多,可能会导致CPU过高。可以尝试减少线程数以降低CPU的使用率。

    3. 调整消息的处理速度:如果消息的处理速度过快,可能会导致CPU高的问题。可以尝试减慢消息的处理速度,例如增加消息处理之间的延迟或者限制并发处理的消息数。

    4. 使用合适的硬件配置:如果Redis服务器的硬件配置不足,可能无法处理大量的消息流并导致CPU高的问题。可以考虑增加硬件资源,例如增加CPU核数或者增加内存容量。

    5. 关闭不必要的Redis功能:如果Redis实例运行了不必要的功能,例如Redis集群、持久化等,可以尝试关闭这些功能以减少CPU的使用量。

    6. 升级Redis版本:可能存在Redis版本中的bug或者性能问题。可以尝试将Redis升级到最新版本以解决问题。

    7. 使用分布式消息队列:如果上述解决方案都无效,可以考虑从Redis Stream切换到分布式消息队列,例如Kafka、RabbitMQ等。这些消息队列具有更好的性能和可扩展性,可以更好地处理大量的消息流。

    以上是一些可能的解决方案,具体解决方法需要根据实际情况进行调整和优化。

    评论

报告相同问题?

问题事件

  • 修改了问题 3月5日
  • 创建了问题 3月5日

悬赏问题

  • ¥20 关于线性结构的问题:希望能从头到尾完整地帮我改一下,困扰我很久了
  • ¥30 3D多模态医疗数据集-视觉问答
  • ¥20 设计一个二极管稳压值检测电路
  • ¥15 内网办公电脑进行向日葵
  • ¥15 如何输入双曲线的参数a然后画出双曲线?我输入处理函数加上后就没有用了,不知道怎么回事去掉后双曲线可以画出来
  • ¥50 WPF Lidgren.Network.Core2连接问题
  • ¥15 soildworks装配体的尺寸问题
  • ¥100 有偿寻云闪付SDK转URL技术
  • ¥30 基于信创PC发布的QT应用如何跨用户启动后输入中文
  • ¥20 非root手机,如何精准控制手机流量消耗的大小,如20M