问题遇到的现象和发生背景
使用canal监控数据库并发送数据到kafka中,新增数据时会抛异常,但是能收到数据,更新数据不会抛异常
问题相关代码,请勿粘贴截图
@KafkaListener(topics = "canal", groupId = "test")
public void craneMessage(ConsumerRecord<?, ?> record){
JSONObject jsonObject = JSONObject.parseObject(record.value().toString().replaceAll("null", "\"\""), Feature.OrderedField);
System.out.println(jsonObject);
}
}
运行结果及报错内容
新增数据可以接受到
{"data":[{"id":"9","crane_id":"9","plan_id":"9","start_station":"9","end_station":"9","tank_type":"9","tank_number":"9","position_x":"","position_y":"","position_z":"","weight":"","left_x":"","right_x":"","remain_x":"","remain_y":"","create_by":"","create_time":"","update_by":"","update_time":""}]}
异常内容
12:10:54.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - [error,149] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ruoyi.common.config.KafkaConsumer.craneMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2371)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2248)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2150)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2032)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1705)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1276)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1268)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ruoyi.common.config.KafkaConsumer.craneMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2383)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2354)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237)
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
Caused by: com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:645)
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1430)
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1390)
at com.alibaba.fastjson.JSON.parse(JSON.java:181)
at com.alibaba.fastjson.JSON.parse(JSON.java:191)
at com.alibaba.fastjson.JSON.parse(JSON.java:244)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:248)
at com.ruoyi.common.config.KafkaConsumer.craneMessage(KafkaConsumer.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
... 11 common frames omitted
12:10:55.048 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.KafkaConsumer - [seek,1587] - [Consumer clientId=consumer-crane-1, groupId=crane] Seeking to offset 111 for partition crane-0
12:10:55.049 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - [error,149] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.ruoyi.common.config.KafkaConsumer.craneMessage(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql; nested exception is com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:206)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerC
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2383)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2354)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2315)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2237)
... 9 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
Caused by: com.alibaba.fastjson.JSONException: syntax error, position at 65, name sql
at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:645)
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1430)
at com.alibaba.fastjson.parser.DefaultJSONParser.parse(DefaultJSONParser.java:1390)
at com.alibaba.fastjson.JSON.parse(JSON.java:181)
at com.alibaba.fastjson.JSON.parse(JSON.java:191)
at com.alibaba.fastjson.JSON.parse(JSON.java:244)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:248)
at com.ruoyi.common.config.KafkaConsumer.craneMessage(KafkaConsumer.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2334)
... 11 common frames omitted
12:11:00.110 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.KafkaConsumer - [seek,1587] - [Consumer clientId=consumer-crane-1, groupId=crane] Seeking to offset 112 for partition crane-0
我的解答思路和尝试过的方法
JSONObject jsonObject = JSONObject.parseObject(record.value().toString().replaceAll("null", "\"\""), Feature.OrderedField);
当执行这条数据时会抛出异常
我想要达到的结果
程序不抛异常