dk_hero 2025-11-27 09:41 采纳率: 33.3%
浏览 8
已结题

jar运行一段时间后接收不到Kafka数据

我有一个Kafka的消费程序,在测试环境下可以正常运行,然后打包成Jar包准备部署到Linux正式环境进行使用
这里发现会出现一个问题,如果使用java -jar xx.jar命令,程序可以正常运行并接收Kafka的数据
而当我使用nohup java -jar xx.jar命令,或者在spring boot的服务里,通过定时任务和ProcessBuilder来运行java-jar xx.jar的指令时,在5-10分钟内可以正常接收到Kafka的数据,之后就不接收数据了,如果是在windows上却是正常的,请各位帮忙看下如何解决?不胜感激!

    public static void main(String[] args) {
        HashMap<String,String> params= ParamUtils.getParamMap(args);
        Integer rouletteIndex=Integer.parseInt(params.get("roulette_index"));
        log.info("正在启动计算器,任务id:"+rouletteIndex);
        List<String> topicList=new ArrayList<>();
        topicList.add(rouletteIndex+ KafkaConstant.TOPIC_JOIN_STR+KafkaConstant.LK_KAFKA1);
        topicList.add(rouletteIndex+KafkaConstant.TOPIC_JOIN_STR+KafkaConstant.LK_KAFKA2);
        topicList.add(rouletteIndex+KafkaConstant.TOPIC_JOIN_STR+KafkaConstant.LK_KAFKA3);
        Properties props = new Properties();
        props.put("bootstrap.servers", ConfigConstant.KAFKA_BOOTSTRAP_SERVERS); // Kafka broker地址
        props.put("group.id",ConfigConstant.KAFKA_CONSUMERID); // 消费者组ID
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 键的反序列化器
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值的反序列化器
        props.put("auto.offset.reset", ConfigConstant.KAFKA_AUTO_OFFSET_RESET); 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(topicList);
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
//                System.out.println(record);
                LkCommonHandler.handler(record.topic(), record.value());
            }
        }

  • 写回答

3条回答 默认 最新

  • 关注

    大概率是这几个原因,挨个排查就行:

    1. Kafka消费者超时被踢:默认max.poll.interval.ms是5分钟,要是你的程序处理消息慢,或者nohup导致poll卡住,消费者会被踢出组。直接在Kafka配置里加props.put("max.poll.interval.ms", "600000");(设10分钟),再补个props.put("session.timeout.ms", "30000");,保证心跳不断。

    2. nohup没重定向stdin:Linux下nohup运行时stdin会被挂起,Kafka客户端底层可能卡这。启动命令改成nohup java -jar xx.jar > nohup.out 2>&1 < /dev/null &,把stdin定向到空设备就好。

    3. ProcessBuilder没处理输出:用ProcessBuilder调jar时,子进程的stdout/stderr缓冲区满了会阻塞,导致没法poll数据。得在代码里加线程读取子进程的输入流和错误流,别让缓冲区堵了。

    4. JVM内存不够假死:Linux下jar跑久了内存溢出也会这样,启动时加JVM参数-Xmx512m -Xms256m,限制下内存。

    先试前两个,基本能解决nohup的问题;ProcessBuilder的话记得读输出流,这是最容易踩的坑~

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(2条)

报告相同问题?

问题事件

  • 系统已结题 12月6日
  • 已采纳回答 11月28日
  • 创建了问题 11月27日