redis集群断网后,redis自动重连了。但是streamListener无法消费数据了。
/**
* 主要做的是将StationStreamMessageListener监听绑定消费者,用于接收消息
*
* @param streamListener
* @return
*/
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> consumerListener1(
StreamMessageListener streamListener) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = streamContainer(streamListener);
container.start();
return container;
}
/**
* @param streamListener 绑定的监听类
* @return
*/
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(StreamListener<String, MapRecord<String, String, String>> streamListener) {
// 创建配置对象
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
options =
StreamMessageListenerContainer.
StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(1)
// 消息消费异常的handler
.errorHandler(e -> log.error("错误信息为:" + e.getMessage(), e))
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(new StringRedisSerializer())
.build();
// 根据配置对象创建监听容器对象
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =
StreamMessageListenerContainer
.create(this.redisConnectionFactory, options);
// 在运行时抛出异常,不取消任务执行
StreamOffset<String> streamOffset = StreamOffset.create(PARAM_GROUP_NAME, ReadOffset.lastConsumed());
Consumer consumer = Consumer.from(groupId, "consumer-1");
StreamMessageListenerContainer.StreamReadRequest<String> readRequest =
StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
.cancelOnError(throwable -> {
// 查询超时,有可能时断网了,不能取消
return !(throwable instanceof QueryTimeoutException);
}).consumer(consumer)
.autoAcknowledge(false)
.build();
streamMessageListenerContainer.register(readRequest, streamListener);
return streamMessageListenerContainer;
}