AloneZaa 2024-05-16 15:57 采纳率: 0%
浏览 3

flink批处理shuffle后的并行度

图1:

img

图2:

img

为什么flink在idea测试默认并行度为1的情况shuffle,sum之后的并行度为max(默认并行度,job中算子最大并行度),如图1打印出3行数据,但是在linux的环境中测试并行度就变成了环境默认并行度了呢?如图2,只打印出一行数据

  • 写回答

2条回答 默认 最新

  • 阿里嘎多学长 2024-05-16 15:57
    关注

    以下内容由CHATGPT及阿里嘎多学长共同生成、有用望采纳:


    您想要解决的问题

    您想要解决的问题是关于Flink批处理作业中shuffle操作后并行度的行为差异。具体来说,您在IDEA环境中测试时发现,即使默认并行度设置为1,经过shuffle和sum操作后,实际并行度会变为默认并行度与作业中算子最大并行度中的最大值,导致打印出多行数据。然而,在Linux环境中测试时,发现并行度遵循环境默认值,只打印出一行数据。

    图片内容

    • 图1:展示了在IDEA环境中运行Flink批处理作业的代码和输出结果。代码中创建了一个Flink批处理执行环境,并进行了数据的映射和聚合操作。输出结果显示了多个聚合结果,这表明并行度被设置为了一个大于1的值。
    • 图2:展示了在REPL(可能是Scala的交互式解释器)环境中运行相似操作的代码和输出结果。这里的输出只显示了一个聚合结果,表明并行度为1。

    问题出现原因

    问题可能出现的原因是在不同环境中Flink作业的默认并行度设置不同,或者是作业的并行度在不同环境中被以不同的方式处理。在IDEA中,可能存在一些配置或者缓存导致并行度被重置或覆盖。而在Linux环境中,可能是使用了不同的Flink配置或者系统属性,导致并行度保持为默认值。

    问题解决方案

    为了确保在不同环境中并行度的行为一致,您需要检查和统一Flink作业的并行度设置。这可以通过以下几种方式实现:

    1. 明确设置并行度:在作业的每个关键操作后都明确设置并行度,确保它不会被默认值覆盖。
    2. 检查环境配置:确保IDEA和Linux环境的Flink配置一致,包括并行度和其他可能影响并行执行的设置。
    3. 使用环境变量:通过设置环境变量来控制Flink作业的默认并行度,这样可以在不同环境中保持一致。

    提供代码

    以下是修改后的代码示例,它在每个关键操作后都明确设置了并行度:

    import org.apache.flink.api.scala._
    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.util.Collector
    
    object Demo01 {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1) // 设置初始并行度
    
        val data = env.fromCollection(List(1, 2, 3, 4, 5))
          .map(_ + 1).setParallelism(3) // 在map操作后设置并行度
          .groupBy(0) // 假设数据是元组,按第一个字段分组
          .sum(1) // 对第二个字段求和
          .setParallelism(2) // 在聚合操作后设置并行度
    
        data.mapPartition(x => println(x.mkString(","))) // 打印每个分区的数据
          .print() // 打印操作
    
        env.execute("Flink Batch Job")
      }
    }
    

    代码运行方式

    • 确保您的环境中安装了Flink,并且flink-submit命令可用。
    • 将上述代码保存为.scala文件,例如Demo01.scala
    • 使用Flink的命令行工具提交作业:
    flink run -c com.example.Demo01 Demo01.scala
    

    代码预期运行结果

    如果并行度设置正确,您应该看到类似以下的输出,每个聚合结果只打印一次:

    (2,1)
    (5,3)
    

    推荐相关链接

    这些链接可以帮助您更深入地了解Flink的并行度设置和作业配置。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 5月16日

悬赏问题

  • ¥15 内网办公电脑进行向日葵
  • ¥15 如何输入双曲线的参数a然后画出双曲线?我输入处理函数加上后就没有用了,不知道怎么回事去掉后双曲线可以画出来
  • ¥50 WPF Lidgren.Network.Core2连接问题
  • ¥15 soildworks装配体的尺寸问题
  • ¥100 有偿寻云闪付SDK转URL技术
  • ¥30 基于信创PC发布的QT应用如何跨用户启动后输入中文
  • ¥20 非root手机,如何精准控制手机流量消耗的大小,如20M
  • ¥15 远程安装一下vasp
  • ¥15 自己做的代码上传图片时,报错
  • ¥15 Lingo线性规划模型怎么搭建