王野也不野 2022-03-20 14:23 采纳率: 0%
浏览 982

kafka中出现:No type information in headers and no default type provided

kafka中突然报了No type information in headers and no default type provided
相关代码
    @KafkaListener(containerFactory = "manualImmediateListenerContainerFactory", topics = {"kafka-manualImmediate"})
    public void onMessageManualImmediate(List<Object> message, Acknowledgment ack) {
        log.info("manualImmediateListenerContainerFactory 处理数据量:{}", message.size());
        message.forEach(item -> {
            log.info("manualImmediateListenerContainerFactory 处理数据内容:{}", item);
        });
        ack.acknowledge();//直接提交offset
    }

相关配置
# producer相关配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.acks=all
# consumer配置
spring.kafka.consumer.group-id=consumer-g1
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.xiaoxiong.dal.entities
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer

运行结果及报错内容
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafka-manualImmediate-1 at offset 7. If needed, please seek past the record to continue consumption.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1403) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1106) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_261]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafka-manualImmediate-1 at offset 7. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:483) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1244) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1144) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1057) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_261]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

请不吝赐教
  • 写回答

1条回答 默认 最新

  • 三千烦恼丝xzh 2022-03-20 14:36
    关注

    好像是序列化时拿不到类型信息,不清楚你的消息类型是什么样的,你可以尝试把List 换成List<Map<String, Object>>或者直接用List

    评论

报告相同问题?

问题事件

  • 创建了问题 3月20日

悬赏问题

  • ¥15 使用C#,asp.net读取Excel文件并保存到Oracle数据库
  • ¥15 C# datagridview 单元格显示进度及值
  • ¥15 thinkphp6配合social login单点登录问题
  • ¥15 HFSS 中的 H 场图与 MATLAB 中绘制的 B1 场 部分对应不上
  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配