我有一个Kafka的消费程序,在测试环境下可以正常运行,然后打包成Jar包准备部署到Linux正式环境进行使用
这里发现会出现一个问题,如果使用java -jar xx.jar命令,程序可以正常运行并接收Kafka的数据
而当我使用nohup java -jar xx.jar命令,或者在spring boot的服务里,通过定时任务和ProcessBuilder来运行java-jar xx.jar的指令时,在5-10分钟内可以正常接收到Kafka的数据,之后就不接收数据了,如果是在windows上却是正常的,请各位帮忙看下如何解决?不胜感激!
public static void main(String[] args) {
HashMap<String,String> params= ParamUtils.getParamMap(args);
Integer rouletteIndex=Integer.parseInt(params.get("roulette_index"));
log.info("正在启动计算器,任务id:"+rouletteIndex);
List<String> topicList=new ArrayList<>();
topicList.add(rouletteIndex+ KafkaConstant.TOPIC_JOIN_STR+KafkaConstant.LK_KAFKA1);
topicList.add(rouletteIndex+KafkaConstant.TOPIC_JOIN_STR+KafkaConstant.LK_KAFKA2);
topicList.add(rouletteIndex+KafkaConstant.TOPIC_JOIN_STR+KafkaConstant.LK_KAFKA3);
Properties props = new Properties();
props.put("bootstrap.servers", ConfigConstant.KAFKA_BOOTSTRAP_SERVERS); // Kafka broker地址
props.put("group.id",ConfigConstant.KAFKA_CONSUMERID); // 消费者组ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器
props.put("auto.offset.reset", ConfigConstant.KAFKA_AUTO_OFFSET_RESET);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(topicList);
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// System.out.println(record);
LkCommonHandler.handler(record.topic(), record.value());
}
}