土肥宅娘口三三 2024-09-13 10:30 采纳率: 0%
浏览 8

flink并行度问题

请问下面的这段代码在.keyBy(1).keyBy(0)时为啥在输出结果的并行度上很大的差异(这段代码没有太多的实际意义,只是对输出有疑惑)

private static void countWindowTest1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        lines
                .flatMap(new FlatMapFunction<String, Tuple3<Integer, String, Integer>>() {
                    public void flatMap(String line, Collector<Tuple3<Integer, String, Integer>> collector) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            Tuple3<Integer, String, Integer> tp = Tuple3.of(Math.abs(word.hashCode() % parallelism), word, 1);
                            collector.collect(tp);
                        }
                    }
                })
                .keyBy(1)
                .countWindow(3)
                .sum(2)
                .print();

        env.execute();
    }

输入同样都是下面的两段话

Fight and you may die, run and you'll live. At least a while. And dying in your beds many years from now, would you be willing to trade all the days from this day to that for one chance, just one chance to come back here and tell our enemies that they may take our lives, but they'll never take our freedom!
Fight and you may die, run and you'll live. At least a while. And dying in your beds many years from now, would you be willing to trade all the days from this day to that for one chance, just one chance to come back here and tell our enemies that they may take our lives, but they'll never take our freedom!

  • .keyBy(1)

img

  • .keyBy(0)

img

为什么第一个输出看起来更符合并行度是3的设置,而第二个输出结果看起来都使用了同一个work呢?

  • 写回答

1条回答 默认 最新

  • 一轮明月照丘壑 2024-09-15 16:46
    关注

    以下回复参考:皆我百晓生券券喵儿等免费微信小程序作答:

    这段代码的核心是使用Apache Flink流处理框架来对数据进行处理。代码的关键在于通过.keyBy(i)进行状态化管理。这里是几个重要的概念和可能的解释:

    并行度(Parallelism)

    在Flink中,并行度决定了处理数据的任务数量。当你设置env.setParallelism(3)时,意味着整个作业将使用三个并行任务来处理数据。然而,并行度并不是绝对的,它可能根据数据和任务的特性进行动态调整。在某些情况下,如果数据分布不均或者某些操作更适合并行处理,可能会存在更小的子任务或多个并发操作,但在您设定并行度的地方应该有尽可能多的资源来分配给这个数目的大致分配或完全平衡的处理能力。实际的情况还会根据数据倾斜和其他因素有所不同。因此,您看到的输出可能与预期有所不同。

    .keyBy(i)的影响

    .keyBy(i)决定了如何对数据进行分区和状态管理。在这里,如果参数是01,则数据将根据单词的哈希值(取模并行度)进行分区。这意味着相同的单词将被发送到同一个并行任务中处理。因此,如果所有单词都是相同的或者存在大量重复单词,可能会导致某些任务比其他任务更忙,这可能会导致数据分布不均的情况出现。因此,在输出中可能会看到不同数量的条目和不同的处理结果。这是因为每个并行任务可能处理不同的单词集合,并且由于数据倾斜,某些任务可能处理更多的数据。因此,尽管设置了并行度,但由于数据分布不均或其他因素,您看到的输出可能看起来不像预期的那样均匀分布。另外,您提到的图中的一些内容似乎不是典型的Flink输出格式,可能是其他因素导致的输出格式问题或数据混淆的问题。如果您能提供更多的上下文或详细说明这些输出的来源和含义,我可能能提供更具体的解释。总之,在处理大数据流时,数据的分布和倾斜对结果有很大影响。确保合理地使用.keyBy()和其他操作以优化数据处理和数据分布是关键。

    评论

报告相同问题?

问题事件

  • 创建了问题 9月13日

悬赏问题

  • ¥15 jetson nano
  • ¥15 :app:debugCompileClasspath'.
  • ¥15 windows c++内嵌qt出现数据转换问题。
  • ¥20 公众号如何实现点击超链接后自动发送文字
  • ¥15 用php隐藏类名和增加类名
  • ¥15 算法设计与分析课程的提问
  • ¥15 用MATLAB汇总拟合图
  • ¥15 智能除草机器人方案设计
  • ¥15 对接wps协作接口实现消息发送
  • ¥15 SQLite 出现“Database is locked” 如何解决?