qq_42584264 2022-05-06 12:49 采纳率: 50%
浏览 144
已结题

canal监控数据库,当新增数据发送给kafka解析成JSON数据时抛异常

问题遇到的现象和发生背景

使用canal监控数据库并发送数据到kafka中,新增数据时会抛异常,但是能收到数据,更新数据不会抛异常

问题相关代码,请勿粘贴截图

@KafkaListener(topics = "canal", groupId = "test")
    public void craneMessage(ConsumerRecord<?, ?> record){
        JSONObject jsonObject = JSONObject.parseObject(record.value().toString().replaceAll("null", "\"\""), Feature.OrderedField);
        System.out.println(jsonObject);
        }
    }

运行结果及报错内容

新增数据可以接受到

{"data":[{"id":"9","crane_id":"9","plan_id":"9","start_station":"9","end_station":"9","tank_type":"9","tank_number":"9","position_x":"","position_y":"","position_z":"","weight":"","left_x":"","right_x":"","remain_x":"","remain_y":"","create_by":"","create_time":"","update_by":"","update_time":""}]}

异常内容

12:10:54.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - [error,149] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ruoyi.common.config.KafkaConsumer.craneMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2371)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2248)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2150)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2032)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1705)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ruoyi.common.config.KafkaConsumer.craneMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2383)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2354)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237)
    ... 9 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
Caused by: com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
    at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:645)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1430)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1390)
    at com.alibaba.fastjson.JSON.parse(JSON.java:181)
    at com.alibaba.fastjson.JSON.parse(JSON.java:191)
    at com.alibaba.fastjson.JSON.parse(JSON.java:244)
    at com.alibaba.fastjson.JSON.parseObject(JSON.java:248)
    at com.ruoyi.common.config.KafkaConsumer.craneMessage(KafkaConsumer.java:43)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
    ... 11 common frames omitted
12:10:55.048 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.c.KafkaConsumer - [seek,1587] - [Consumer clientId=consumer-crane-1, groupId=crane] Seeking to offset 111 for partition crane-0
12:10:55.049 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - [error,149] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ruoyi.common.config.KafkaConsumer.craneMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerC
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2383)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2354)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237)
    ... 9 common frames omitted
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
Caused by: com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
    at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:645)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1430)
    at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1390)
    at com.alibaba.fastjson.JSON.parse(JSON.java:181)
    at com.alibaba.fastjson.JSON.parse(JSON.java:191)
    at com.alibaba.fastjson.JSON.parse(JSON.java:244)
    at com.alibaba.fastjson.JSON.parseObject(JSON.java:248)
    at com.ruoyi.common.config.KafkaConsumer.craneMessage(KafkaConsumer.java:43)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
    ... 11 common frames omitted
12:11:00.110 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.c.KafkaConsumer - [seek,1587] - [Consumer clientId=consumer-crane-1, groupId=crane] Seeking to offset 112 for partition crane-0

我的解答思路和尝试过的方法

JSONObject jsonObject = JSONObject.parseObject(record.value().toString().replaceAll("null", "\"\""), Feature.OrderedField);

当执行这条数据时会抛出异常
我想要达到的结果
程序不抛异常

  • 写回答

2条回答 默认 最新

  • qq_42584264 2022-05-07 09:54
    关注

    问题解决方式:新建一个异常处理器,将异常名放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常时,会进入到异常处理器中。

        @Bean
        public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
            return (message, exception, consumer) -> {
                log.info("消费异常信息:"+message.getPayload());
                return null;
            };
        }
    
    
    @KafkaListener(topics = "canal", groupId = "test", errorHandler = "consumerAwareErrorHandler")
    public void craneMessage(ConsumerRecord<?, ?> record) {
    
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 5月15日
  • 已采纳回答 5月7日
  • 修改了问题 5月6日
  • 创建了问题 5月6日

悬赏问题

  • ¥15 系统 24h2 专业工作站版,浏览文件夹的图库,视频,图片之类的怎样删除?
  • ¥15 怎么把512还原为520格式
  • ¥15 MATLAB的动态模态分解出现错误,以CFX非定常模拟结果为快照
  • ¥15 求高通平台Softsim调试经验
  • ¥15 canal如何实现将mysql多张表(月表)采集入库到目标表中(一张表)?
  • ¥15 wpf ScrollViewer实现冻结左侧宽度w范围内的视图
  • ¥15 栅极驱动低侧烧毁MOSFET
  • ¥30 写segy数据时出错3
  • ¥100 linux下qt运行QCefView demo报错
  • ¥50 F1C100S下的红外解码IR_RX驱动问题