kafka中突然报了No type information in headers and no default type provided
相关代码
@KafkaListener(containerFactory = "manualImmediateListenerContainerFactory", topics = {"kafka-manualImmediate"})
public void onMessageManualImmediate(List<Object> message, Acknowledgment ack) {
log.info("manualImmediateListenerContainerFactory 处理数据量:{}", message.size());
message.forEach(item -> {
log.info("manualImmediateListenerContainerFactory 处理数据内容:{}", item);
});
ack.acknowledge();
}
相关配置
# producer相关配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.acks=all
# consumer配置
spring.kafka.consumer.group-id=consumer-g1
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.xiaoxiong.dal.entities
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
运行结果及报错内容
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafka-manualImmediate-1 at offset 7. If needed, please seek past the record to continue consumption.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1403) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1106) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_261]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafka-manualImmediate-1 at offset 7. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.2.12.RELEASE.jar:5.2.12.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:483) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1244) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1144) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1057) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_261]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_261]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
请不吝赐教