flink创建表报错:完整代码如下:
StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);
String sql = "create table test_table ( \n" +
"id int,\n" +
"fk_drug_info int,\n" +
"fk_professional_id string,\n" +
"urine_time TIMESTAMP,\n" +
"urine_place string,\n" +
"urine_result int,\n" +
"test_type int,\n" +
"test_time TIMESTAMP,\n" +
"seal_time TIMESTAMP,\n" +
"urine_image string,\n" +
"urine_video string,\n" +
"urine_cause string,\n" +
"help_professional_id int,\n" +
"latitude string,\n" +
"fk_auth_manager int,\n" +
"table_date int,\n" +
"urine_state_type int,\n" +
"annex string,\n" +
"update_time int,\n" +
"is_del int\n"+
") with ( \n" +
"'connector' = 'kafka',\n" +
"'topic' = 'LG831_test.dbo.urine_upload',\n" +
"'properties.bootstrap.servers' = 'hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092',\n" +
"'properties.group.id' = 'connect-cluster',\n" +
"'format' = 'debezium-json'"+
")";
ssTableEnv.executeSql(sql);
Table result = ssTableEnv.sqlQuery("select * from test_table");
result.execute().print();
报错信息如下:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/table/functions/UserDefinedAggregateFunction
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
at flink.java.FlinkTable.main(FlinkTable.java:19)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.functions.UserDefinedAggregateFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 2 more
错误堆栈提示找不到类,但是不知道具体缺少哪个类,情各位大神帮忙
pom.xml信息如下:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.12.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<!-- log4j的依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.12.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!-- table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.11.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.11.1</version>
</dependency>
</dependencies>