王野也不野 2022-03-20 06:23 采纳率: 0%
浏览 1303

kafka中出现:No type information in headers and no default type provided

kafka中突然报了No type information in headers and no default type provided
相关代码
    @KafkaListener(containerFactory = "manualImmediateListenerContainerFactory", topics = {"kafka-manualImmediate"})
    public void onMessageManualImmediate(List<Object> message, Acknowledgment ack) {
        log.info("manualImmediateListenerContainerFactory 处理数据量:{}", message.size());
        message.forEach(item -> {
            log.info("manualImmediateListenerContainerFactory 处理数据内容:{}", item);
        });
        ack.acknowledge();//直接提交offset
    }

相关配置
# producer相关配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.acks=all
# consumer配置
spring.kafka.consumer.group-id=consumer-g1
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.xiaoxiong.dal.entities
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.StringDeserializer

运行结果及报错内容
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafka-manualImmediate-1 at offset 7. If needed, please seek past the record to continue consumption.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1403) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1106) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_261]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kafka-manualImmediate-1 at offset 7. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:483) ~[spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) ~[kafka-clients-2.5.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1244) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1144) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1057) [spring-kafka-2.5.10.RELEASE.jar:2.5.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_261]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_261]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_261]

请不吝赐教

展开全部

  • 写回答

2条回答 默认 最新

  • Haven.Liu 2024-06-27 12:42
    关注

    消费者配置调整成:props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 试试。

    一般是序列化引起的

    评论 编辑记录
  • 三千烦恼丝xzh 2022-03-20 06:36
    关注

    好像是序列化时拿不到类型信息,不清楚你的消息类型是什么样的,你可以尝试把List 换成List<Map<String, Object>>或者直接用List

    评论
编辑
预览

报告相同问题?

手机看
程序员都在用的中文IT技术交流社区

程序员都在用的中文IT技术交流社区

专业的中文 IT 技术社区,与千万技术人共成长

专业的中文 IT 技术社区,与千万技术人共成长

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

客服 返回
顶部