我的电脑是8线程,当我运行下面这段代码时,可以输出结果,但是当把并行度注释掉时,就不输出结果了,这是为什么呢?输入数据一样,watermark应该都是一样的啊
输入数据为:
1585721697000,xiao,8
1585721700000,xiao,10
1585721705000,xiao,4
1585721715000,xiao,9
case class Student(id:Long,name:String,age:Int)
object test3 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
//指定时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val value: DataStream[String] = env.socketTextStream("localhost", 9999)
val map: DataStream[Student] = value.map(line => {
val arr: Array[String] = line.split(",")
Student(arr(0).toLong, arr(1), arr(2).toInt)
})
val value1: DataStream[Student] = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Student](Time.seconds(3)) {
override def extractTimestamp(t: Student): Long = {
t.id
}
})
val value2: KeyedStream[Student, String] = value1.keyBy(_.name)
val value3: WindowedStream[Student, String, TimeWindow] = value2.timeWindow(Time.seconds(10))
val value4: DataStream[Student] = value3.sum(2)
value4.print()
env.execute()
}
}