LeiKe_ 2021-11-21 21:32 采纳率: 0%
浏览 20
已结题

Flink1.11.3 设置水位线报异常

package com.xu.studyApp.watermarkTest

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._

import java.time.Duration

/**
 * @author MrXu
 * @create 2021-11-21 19:05
 * @desc
 */
object WatermarkTest2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.enableCheckpointing(60000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(5000)  // 默认200ms一次水印;
    env.setParallelism(1)

    val socketInput: DataStream[String] = env.socketTextStream("127.0.0.1", 9999)
    val mapInput: DataStream[(String, Long)] = socketInput.map((line: String) => {
      val words: Array[String] = line.split(",")
      (words(0), words(1).toLong)
    })

    // 数据有可能是乱序的,设置数据的水位线提取.
    val dataWithWaterMarkInput: DataStream[(String, Long)] = mapInput.assignTimestampsAndWatermarks(
      WatermarkStrategy
        .forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(element: (String, Long), recordTimestamp: Long) = {
            element._2
          }
        })
    )

    dataWithWaterMarkInput
      .keyBy((_: (String, Long))._1)
      .minBy(1)
      .print()

    env.execute("WatermarkTest2")
  }
}


异常:

Static methods in interface require -target:jvm-1.8
        .forBoundedOutOfOrderness[(String, String)](Duration.ofSeconds(5))

灰常不解, 不知道是Idea设置不对还是代码本身有问题, 但是Flink官网文档也是这样写的. JDK都升成了8版本最高版本也是不行, 求帮助!

  • 写回答

1条回答 默认 最新

  • 有问必答小助手 2021-11-23 09:52
    关注

    你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答


    本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。


    因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。

    评论

报告相同问题?

问题事件

  • 系统已结题 11月29日
  • 修改了问题 11月21日
  • 创建了问题 11月21日

悬赏问题

  • ¥20 delta降尺度方法,未来数据怎么降尺度
  • ¥15 c# 使用NPOI快速将datatable数据导入excel中指定sheet,要求快速高效
  • ¥15 高德地图点聚合中Marker的位置无法实时更新
  • ¥15 DIFY API Endpoint 问题。
  • ¥20 sub地址DHCP问题
  • ¥15 delta降尺度计算的一些细节,有偿
  • ¥15 Arduino红外遥控代码有问题
  • ¥15 数值计算离散正交多项式
  • ¥30 数值计算均差系数编程
  • ¥15 redis-full-check比较 两个集群的数据出错