hyxxyjh 2020-02-14 18:26 采纳率: 0%
浏览 1404

kafka集成flink报出如下错误如何解决

idea运行kafka集成flink的项目运行报错。

public class KafkaFlinkDemo1 {

public static void main(String[] args) throws Exception {
    //获取执行环境
    StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    //创建一个Table Environment
    StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);

    sTableEnv.connect(new Kafka()
            .version("0.10")
            .topic("topic1")
            .startFromLatest()
            .property("group.id", "group1")
            .property("bootstrap.servers", "172.168.30.105:21005")
    ).withFormat(
            new Json().failOnMissingField(false).deriveSchema()
    ).withSchema(
            new Schema().field("userId", Types.LONG())
                    .field("day", Types.STRING())
                    .field("begintime", Types.LONG())
                    .field("endtime", Types.LONG())
                    .field("data", ObjectArrayTypeInfo.getInfoFor(
                            Row[].class,
                            Types.ROW(new String[]{"package", "activetime"},
                                    new TypeInformation[]{Types.STRING(), Types.LONG()}
                            )
                    ))
    ).inAppendMode().registerTableSource("userlog");

    Table result = sTableEnv.sqlQuery("select userId from userlog");

    DataStream<Row> rowDataStream = sTableEnv.toAppendStream(result, Row.class);

    rowDataStream.print();

    sEnv.execute("KafkaFlinkDemo1");
}

}

报错信息如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/ch/qos/logback/logback-classic/1.1.3/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
at com.huawei.bigdata.KafkaFlinkDemo1.main(KafkaFlinkDemo1.java:41)

Process finished with exit code 1

  • 写回答

1条回答

  • zqbnqsdsmd 2020-02-14 23:53
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 求解 yolo算法问题
  • ¥15 虚拟机打包apk出现错误
  • ¥30 最小化遗憾贪心算法上界
  • ¥15 用visual studi code完成html页面
  • ¥15 聚类分析或者python进行数据分析
  • ¥15 逻辑谓词和消解原理的运用
  • ¥15 三菱伺服电机按启动按钮有使能但不动作
  • ¥15 js,页面2返回页面1时定位进入的设备
  • ¥50 导入文件到网吧的电脑并且在重启之后不会被恢复
  • ¥15 (希望可以解决问题)ma和mb文件无法正常打开,打开后是空白,但是有正常内存占用,但可以在打开Maya应用程序后打开场景ma和mb格式。