Jaming R 2020-04-01 17:00 采纳率: 0%
浏览 450

flink 并行度导致输出控制台结果问题

我的电脑是8线程,当我运行下面这段代码时,可以输出结果,但是当把并行度注释掉时,就不输出结果了,这是为什么呢?输入数据一样,watermark应该都是一样的啊

输入数据为:
1585721697000,xiao,8
1585721700000,xiao,10
1585721705000,xiao,4
1585721715000,xiao,9

case class Student(id:Long,name:String,age:Int)
object test3 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)
    //指定时间类型
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val value: DataStream[String] = env.socketTextStream("localhost", 9999)

    val map: DataStream[Student] = value.map(line => {
      val arr: Array[String] = line.split(",")
      Student(arr(0).toLong, arr(1), arr(2).toInt)
    })
    val value1: DataStream[Student] = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Student](Time.seconds(3)) {
      override def extractTimestamp(t: Student): Long = {
        t.id
      }
    })
    val value2: KeyedStream[Student, String] = value1.keyBy(_.name)
    val value3: WindowedStream[Student, String, TimeWindow] = value2.timeWindow(Time.seconds(10))
    val value4: DataStream[Student] = value3.sum(2)
    value4.print()
    env.execute()
  }
}

  • 写回答

2条回答 默认 最新

  • 你知我知皆知 2024-07-25 16:32
    关注

    以下回答参考 皆我百晓生券券喵儿 等免费微信小程序相关内容作答,并由本人整理回复。

    你提供的代码是一个Flink应用程序,它使用Apache Flink的流处理功能来读取来自本地端口9999的数据,并将其转换成一个学生对象。然后,它将这些学生对象按姓名进行分组,并对每个组应用窗口化操作,最后计算出每个学生的总分数。

    在你的代码中,assignTimestampsAndWatermarks方法被用来设置事件时间特征。这个方法会为每个数据点分配一个唯一的事件时间戳,并根据给定的时间间隔创建一个新的时间窗。

    当你运行此程序时,assignTimestampsAndWatermarks方法会自动为每个数据点分配一个事件时间戳。然而,在你的测试代码中,你没有调用这个方法,因此你的应用程序无法正确地获取和使用这些事件时间戳。这可能导致你无法正确地执行时间和窗口操作。

    要解决这个问题,你需要在你的测试代码中调用assignTimestampsAndWatermarks方法。你可以这样做:

    value1.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Student](Time.seconds(3)) {
      override def extractTimestamp(t: Student): Long = {
        t.id
      }
    })
    

    这样,你的程序就会正确地使用事件时间戳,并且应该能够打印出正确的结果。

    评论

报告相同问题?