女汉子学者 2023-11-01 14:45 采纳率: 20%
浏览 23
已结题

redisstream 断线重连

redis集群断网后,redis自动重连了。但是streamListener无法消费数据了。

/**
     * 主要做的是将StationStreamMessageListener监听绑定消费者,用于接收消息
     *
     * @param streamListener
     * @return
     */
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> consumerListener1(
            StreamMessageListener streamListener) {
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = streamContainer(streamListener);
        container.start();
        return container;
    }

    /**
     * @param streamListener 绑定的监听类
     * @return
     */
    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(StreamListener<String, MapRecord<String, String, String>> streamListener) {
        // 创建配置对象
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
                options =
                StreamMessageListenerContainer.
                        StreamMessageListenerContainerOptions
                        .builder()
                        // 一次性最多拉取多少条消息
                        .batchSize(1)
                        // 消息消费异常的handler
                        .errorHandler(e -> log.error("错误信息为:" + e.getMessage(), e))
                        // 超时时间,设置为0,表示不超时(超时后会抛出异常)
                        .pollTimeout(Duration.ZERO)
                        // 序列化器
                        .serializer(new StringRedisSerializer())
                        .build();

        // 根据配置对象创建监听容器对象
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
                StreamMessageListenerContainer
                        .create(this.redisConnectionFactory, options);

        // 在运行时抛出异常,不取消任务执行
        StreamOffset<String> streamOffset = StreamOffset.create(PARAM_GROUP_NAME, ReadOffset.lastConsumed());
        Consumer consumer = Consumer.from(groupId, "consumer-1");
        StreamMessageListenerContainer.StreamReadRequest<String> readRequest =
                StreamMessageListenerContainer.StreamReadRequest
                        .builder(streamOffset)
                        .cancelOnError(throwable -> {
                            // 查询超时,有可能时断网了,不能取消
                            return !(throwable instanceof QueryTimeoutException);
                        }).consumer(consumer)
                        .autoAcknowledge(false)
                        .build();
        streamMessageListenerContainer.register(readRequest, streamListener);
        return streamMessageListenerContainer;
    }
  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-11-01 16:04
    关注

    【以下回答由 GPT 生成】

    问题解决方案:

    在Redis断线重连后,为了使StreamListener能够继续消费数据,你可以使用Redis连接工厂的setEventListeners方法注册一个连接监听器(RedisConnectionListener),在连接断开和重新连上时执行相应的操作。

    具体步骤如下:

    1. 创建一个实现RedisConnectionListener接口的监听器类,例如MyRedisConnectionListener
    public class MyRedisConnectionListener implements RedisConnectionListener {
    
        private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;
        private StreamListener<String, MapRecord<String, String, String>> streamListener;
    
        public MyRedisConnectionListener(StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer,
                StreamListener<String, MapRecord<String, String, String>> streamListener) {
            this.streamMessageListenerContainer = streamMessageListenerContainer;
            this.streamListener = streamListener;
        }
    
        @Override
        public void onCommand(RedisConnection connection, ProtocolCommand command, byte[]... args) {
    
        }
    
        @Override
        public void onDisconnect(RedisConnection connection) {
            // 连接断开时,停止监听容器
            streamMessageListenerContainer.stop();
        }
    
        @Override
        public void onConnected(RedisConnection connection) {
            // 连接重新连上时,重新启动监听容器
            streamMessageListenerContainer.start();
            // 注册新的读取请求
            streamMessageListenerContainer.register(readRequest, streamListener);
        }
    }
    
    1. 修改streamContainer方法,在创建StreamMessageListenerContainer对象之后,通过setEventListeners方法为连接工厂设置连接监听器:
    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(StreamListener<String, MapRecord<String, String, String>> streamListener) {
        // 创建配置对象...
    
        // 根据配置对象创建监听容器对象...
    
        // 在运行时抛出异常...
    
        // 设置监听容器的读取请求
        StreamOffset<String> streamOffset = StreamOffset.create(PARAM_GROUP_NAME, ReadOffset.lastConsumed());
        Consumer consumer = Consumer.from(groupId, "consumer-1");
        StreamMessageListenerContainer.StreamReadRequest<String> readRequest
                = StreamMessageListenerContainer.StreamReadRequest
                .builder(streamOffset)
                .cancelOnError(throwable -> {
                    // 查询超时...
                }).consumer(consumer)
                .autoAcknowledge(false)
                .build();
    
        streamMessageListenerContainer.register(readRequest, streamListener);
    
        // 创建连接监听器
        MyRedisConnectionListener connectionListener = new MyRedisConnectionListener(streamMessageListenerContainer, streamListener);
    
        // 为连接工厂设置连接监听器
        this.redisConnectionFactory.setEventListeners(Arrays.asList(connectionListener));
    
        return streamMessageListenerContainer;
    }
    

    通过以上修改,当Redis断线重连后,StreamListener会继续消费数据。连接监听器在连接断开时停止监听容器,在连接重新连上时重新启动监听容器,并注册新的读取请求。这样就可以保证在Redis断线重连后,StreamListener能够继续消费数据了。



    【相关推荐】



    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 11月2日
  • 修改了问题 11月2日
  • 创建了问题 11月1日

悬赏问题

  • ¥20 关于线性结构的问题:希望能从头到尾完整地帮我改一下,困扰我很久了
  • ¥20 设计一个二极管稳压值检测电路
  • ¥15 内网办公电脑进行向日葵
  • ¥15 如何输入双曲线的参数a然后画出双曲线?我输入处理函数加上后就没有用了,不知道怎么回事去掉后双曲线可以画出来
  • ¥50 WPF Lidgren.Network.Core2连接问题
  • ¥15 soildworks装配体的尺寸问题
  • ¥100 有偿寻云闪付SDK转URL技术
  • ¥30 基于信创PC发布的QT应用如何跨用户启动后输入中文
  • ¥20 非root手机,如何精准控制手机流量消耗的大小,如20M
  • ¥15 远程安装一下vasp