dk_hero 2025-11-01 17:55 采纳率: 33.3%
浏览 6

Flink数据处理速度过慢

我有一个由3台服务器组成的Flink集群,用的standalone模式,每个服务器开启了5个slot,一共15个slot
Flink集群用来处理试验大数据任务,整个任务时先接收来自Kafka的数据,然后进行数据处理。数据处理首先会先对数据进行识别,然后通过mysql的数据连接池到数据库查询计算参数,如果计算参数不变则根据已有的计算类中的静态变量进行数据计算,如果计算参数改变了,则将计算参数重新存入计算类的静态变量,然后继续计算。整个集群就处理这一个任务。

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        String topicName="lkcan1";
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigConstant.KAFKA_BOOTSTRAP_SERVERS);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
        properties.setProperty("flink.partition-discovery.interval-millis","30000");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,ConfigConstant.KAFKA_AUTO_OFFSET_RESET);
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource( consumer);

        stream.flatMap(new FlatMapFunction<String, ArrayList<Put>>(){
            @Override
            public void flatMap(String value, Collector<ArrayList<Put>> out) throws Exception {
                log.info("已接收到数据;{}","lkcan1");
                LkCommonHandler.handler(topicName,value);
            }
        }).setParallelism(ConfigConstant.parallelism);

当我把任务的算子平行度设置为1时(只使用到服务器1的slot),计算的速度很快,每一组流数据处理差不多10S左右
2025-11-01 16:45:24,551 INFO flink.handler.lk.LkSensorHandler [] - 数据写入数据库完成,时长:1
2025-11-01 16:45:24,552 INFO flink.handler.lk.LkCommonHandler [] - 数据处理完毕,时长:11
当我把任务的算子平行度设置为5时(只使用到服务器1的slot),计算的速度就很慢了,每一组流数据处理差不多90S左右
25-11-01 16:56:57,449 INFO flink.handler.lk.LkSensorHandler [] - 数据写入数据库完成,时长:1
2025-11-01 16:56:57,449 INFO flink.handler.lk.LkCommonHandler [] - 数据处理完毕,时长:90
当我把任务的算子平行度设置为15时(占用所有slot),计算的速度更慢,每一组流数据处理差不多110S左右
025-11-01 17:17:43,955 INFO flink.handler.lk.LkSensorHandler [] - 数据写入数据库完成,时长:2
2025-11-01 17:17:43,956 INFO flink.handler.lk.LkCommonHandler [] - 数据处理完毕,时长:116

img

img

如果我把整个任务拆分为15个任务,对应分别接收15个kafka的topic进行数据处理,则单个数据的处理时间大约为30S左右

请教下应该如何解决整个速度慢的问题?谢谢!

  • 写回答

4条回答 默认 最新

  • 阿里嘎多学长 2025-11-01 18:29
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    解决Flink数据处理速度过慢的方法

    根据你的描述,Flink集群使用standalone模式,每个服务器开启了5个slot,一共15个slot。这里有几点需要注意:

    1. slot数量不足: 如果任务的并行度设置得太高,而slot数量不足,可能会导致任务的执行速度过慢。请检查任务的并行度设置是否合理。
    2. Kafka数据吞吐量: 如果Kafka数据的吞吐量很大,可能会导致Flink集群处理速度过慢。请检查Kafka的配置是否合理。
    3. Flink集群资源配置: 请检查Flink集群的资源配置是否足够,包括CPU、内存、磁盘等。

    解决方案

    1. 检查任务的并行度设置: 确保任务的并行度设置得合理,不超过slot数量。
    2. 优化Kafka配置: 优化Kafka的配置,包括增加分区数、调整分区的大小等。
    3. 检查Flink集群资源配置: 确保Flink集群的资源配置足够,包括CPU、内存、磁盘等。
    4. 使用Flink的高级特性: 使用Flink的高级特性,如 checkpoint、savepoint 等,可以帮助提高任务的执行速度。

    核心代码

    // 检查任务的并行度设置
    int parallelism = env.getParallelism();
    if (parallelism > slotNum) {
        env.setParallelism(slotNum);
    }
    
    // 优化Kafka配置
    Properties props = new Properties();
    props.put("num.partitions", 10); // 增加分区数
    props.put("partition.size", 1024 * 1024 * 1024); // 调整分区大小
    
    // 检查Flink集群资源配置
    int cpuNum = Runtime.getRuntime().availableProcessors();
    if (cpuNum < 4) {
        // 增加CPU核心数
    }
    
    // 使用Flink的高级特性
    env.enableCheckpointing(1000); // 开启checkpoint
    env.enableSavepoint(1000); // 开启savepoint
    
    评论

报告相同问题?

问题事件

  • 创建了问题 11月1日