package com.xu.studyApp.watermarkTest
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._
import java.time.Duration
/**
* @author MrXu
* @create 2021-11-21 19:05
* @desc
*/
object WatermarkTest2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(60000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(5000) // 默认200ms一次水印;
env.setParallelism(1)
val socketInput: DataStream[String] = env.socketTextStream("127.0.0.1", 9999)
val mapInput: DataStream[(String, Long)] = socketInput.map((line: String) => {
val words: Array[String] = line.split(",")
(words(0), words(1).toLong)
})
// 数据有可能是乱序的,设置数据的水位线提取.
val dataWithWaterMarkInput: DataStream[(String, Long)] = mapInput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(element: (String, Long), recordTimestamp: Long) = {
element._2
}
})
)
dataWithWaterMarkInput
.keyBy((_: (String, Long))._1)
.minBy(1)
.print()
env.execute("WatermarkTest2")
}
}
异常:
Static methods in interface require -target:jvm-1.8
.forBoundedOutOfOrderness[(String, String)](Duration.ofSeconds(5))
灰常不解, 不知道是Idea设置不对还是代码本身有问题, 但是Flink官网文档也是这样写的. JDK都升成了8版本最高版本也是不行, 求帮助!