hyxxyjh
hyxxyjh
采纳率0%
2020-02-14 18:26 阅读 1.1k

kafka集成flink报出如下错误如何解决

idea运行kafka集成flink的项目运行报错。

public class KafkaFlinkDemo1 {

public static void main(String[] args) throws Exception {
    //获取执行环境
    StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    //创建一个Table Environment
    StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);

    sTableEnv.connect(new Kafka()
            .version("0.10")
            .topic("topic1")
            .startFromLatest()
            .property("group.id", "group1")
            .property("bootstrap.servers", "172.168.30.105:21005")
    ).withFormat(
            new Json().failOnMissingField(false).deriveSchema()
    ).withSchema(
            new Schema().field("userId", Types.LONG())
                    .field("day", Types.STRING())
                    .field("begintime", Types.LONG())
                    .field("endtime", Types.LONG())
                    .field("data", ObjectArrayTypeInfo.getInfoFor(
                            Row[].class,
                            Types.ROW(new String[]{"package", "activetime"},
                                    new TypeInformation[]{Types.STRING(), Types.LONG()}
                            )
                    ))
    ).inAppendMode().registerTableSource("userlog");

    Table result = sTableEnv.sqlQuery("select userId from userlog");

    DataStream<Row> rowDataStream = sTableEnv.toAppendStream(result, Row.class);

    rowDataStream.print();

    sEnv.execute("KafkaFlinkDemo1");
}

}

报错信息如下:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/ch/qos/logback/logback-classic/1.1.3/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
at com.huawei.bigdata.KafkaFlinkDemo1.main(KafkaFlinkDemo1.java:41)

Process finished with exit code 1

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

1条回答 默认 最新

相关推荐