女汉子学者 2023-11-02 10:06 采纳率: 20%
浏览 22

redis stream 断网重连

redis 集群断网了,程序封装的方法,对redis进行了重连,但是stream确连接不上了,无法消费新消息,,下面是我的监听类,该如何改造,可以实现不重启,即可消费消息呢?

/**
     * 主要做的是将StationStreamMessageListener监听绑定消费者,用于接收消息
     *
     * @param streamListener
     * @return
     */
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> consumerListener1(
            StreamMessageListener streamListener) {
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = streamContainer(streamListener);
        container.start();
        return container;
    }
 
    /**
     * @param streamListener 绑定的监听类
     * @return
     */
    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(StreamListener<String, MapRecord<String, String, String>> streamListener) {
        // 创建配置对象
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
                options =
                StreamMessageListenerContainer.
                        StreamMessageListenerContainerOptions
                        .builder()
                        // 一次性最多拉取多少条消息
                        .batchSize(1)
                        // 消息消费异常的handler
                        .errorHandler(e -> log.error("错误信息为:" + e.getMessage(), e))
                        // 超时时间,设置为0,表示不超时(超时后会抛出异常)
                        .pollTimeout(Duration.ZERO)
                        // 序列化器
                        .serializer(new StringRedisSerializer())
                        .build();
 
        // 根据配置对象创建监听容器对象
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
                StreamMessageListenerContainer
                        .create(this.redisConnectionFactory, options);
 
        // 在运行时抛出异常,不取消任务执行
        StreamOffset<String> streamOffset = StreamOffset.create(PARAM_GROUP_NAME, ReadOffset.lastConsumed());
        Consumer consumer = Consumer.from(groupId, "consumer-1");
        StreamMessageListenerContainer.StreamReadRequest<String> readRequest =
                StreamMessageListenerContainer.StreamReadRequest
                        .builder(streamOffset)
                        .cancelOnError(throwable -> {
                            // 查询超时,有可能时断网了,不能取消
                            return !(throwable instanceof QueryTimeoutException);
                        }).consumer(consumer)
                        .autoAcknowledge(false)
                        .build();
        streamMessageListenerContainer.register(readRequest, streamListener);
        return streamMessageListenerContainer;
    }

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-11-02 12:06
    关注

    【以下回答由 GPT 生成】

    优化建议:

    1. 确保你已经正确配置了RedisConnectionFactory以连接到Redis集群。

    2. 为了实现断网重连的效果,你可以在streamContainer方法中添加重试逻辑。当连接断开时,你可以选择定时重试连接,直到成功连接到Redis集群。你可以使用一个循环或者定时任务来实现重试逻辑。

    3. 在重试过程中,可以添加一些日志记录,以便追踪重连操作的情况。

    以下是一个示例代码,演示了如何在重连时进行断网重连的逻辑:

    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(StreamListener<String, MapRecord<String, String, String>> streamListener) {
        // 创建配置对象
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .batchSize(1)
                .errorHandler(e -> log.error("错误信息为:" + e.getMessage(), e))
                .timeout(Duration.ZERO)
                .build();
    
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
            StreamMessageListenerContainer.create(redisConnectionFactory, options);
    
        // 设置监听器
        container.receiveAutoAck(streamListener);
    
        // 添加重连逻辑
        RetryTemplate retryTemplate = new RetryTemplate();
        BackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(5000); // 重试间隔设置为5秒
        retryTemplate.setBackOffPolicy(backOffPolicy);
    
        RetryCallback<Void, Exception> retryCallback = context -> {
            container.start();
            return null;
        };
    
        RetryListener retryListener = new RetryListenerSupport() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                log.info("开始重试连接Redis集群");
                return true;
            }
    
            @Override
            public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                log.error("重试连接Redis集群失败", throwable);
            }
    
            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                log.info("成功连接到Redis集群");
            }
        };
    
        retryTemplate.registerListener(retryListener);
        retryTemplate.execute(retryCallback);
    
        return container;
    }
    

    这样,当连接断开后,程序会尝试每5秒重连一次,直到成功连接到Redis集群为止。

    请注意,以上示例中的重连逻辑仅供参考,你可以根据实际情况进行优化和调整。



    【相关推荐】


    • 看下这篇博客,也许你就懂了,链接:控制 Redis stream 的消息数量
    • 除此之外, 这篇博客: Redis实战篇中的 7.6 基于Redis的Stream结构作为消息队列,实现异步秒杀下单 部分也许能够解决你的问题。

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 修改了问题 11月2日
  • 修改了问题 11月2日
  • 修改了问题 11月2日
  • 创建了问题 11月2日

悬赏问题

  • ¥20 关于线性结构的问题:希望能从头到尾完整地帮我改一下,困扰我很久了
  • ¥20 设计一个二极管稳压值检测电路
  • ¥15 内网办公电脑进行向日葵
  • ¥15 如何输入双曲线的参数a然后画出双曲线?我输入处理函数加上后就没有用了,不知道怎么回事去掉后双曲线可以画出来
  • ¥50 WPF Lidgren.Network.Core2连接问题
  • ¥15 soildworks装配体的尺寸问题
  • ¥100 有偿寻云闪付SDK转URL技术
  • ¥30 基于信创PC发布的QT应用如何跨用户启动后输入中文
  • ¥20 非root手机,如何精准控制手机流量消耗的大小,如20M
  • ¥15 远程安装一下vasp