项目中有个短信发送的功能,从客户提供的Kafka中消费数据,拿到这数据后按业务需求进行处理,把处理后的内容以短信的方式发送给客户
这个功能是同事写的,最近项目上线,客户做了个代码评审,发现从Kafka中获取数据(poll方法),使用while循环来实现的,觉得这是死循环,要求整改,查了很多资料都是这么写的,客户说什么也要整改,现在同事离职了,问不到他了
下面是代码
//下面两段代码在同一个类里
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
while (true) {
//启动
initDedz(khjl, glz);
//设置当前状态为启动
is_star = true;
}
}
void initDedz(BigDecimal khjl, BigDecimal glz){
// Kafka 获取的流水数据
List<String> listStr = KafkaApi.getDEYDJsonList();
// 处理报文
List< HashMap> listMap = getJyMap(listStr);
yyyy=DateUtil.format(new Date(),"yyyy年");
for(Map map :listMap){
// 具体的业务处理逻辑,这里就不放了,主要是getDEYDJsonList()这个方法
}
}
// 这段代码和上面两段代码不在一个Java类里,这段代码在一个叫KafkaApi的类里,kafkaConsumer在这个类里有定义,这里没有贴代码,上面是调用
// 以下就是消费Kafka的逻辑
public static List<String> getDEYDJsonList() {
List<String> reList = new ArrayList<>();
try {
if (kafkaConsumer == null) {
kafkaConsumer = getKafkaConsumer();
kafkaConsumer.subscribe(Arrays.asList(HX_KAFKA_TOPCI));
kafkaConsumer.poll(Duration.ofSeconds(2));
kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
}
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(0));
for (ConsumerRecord<String, String> record : records) {
reList.add(record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
return reList;
}