使用 flink1.17.0创建JdbcCatalog后执行mysql数据库sql查询, 遇到执行information_schema.TABLES查询异常,通过对源代码的跟踪与阅读,怀疑是官方bug;
具体如如下:
String sql = "select ....";
JdbcCatalog catalog = new JdbcCatalog(ClassLoader.getSystemClassLoader(), "myCateLog", "flink?useUnicode=true&serverTimezone=Asia/Shanghai&useTimezone=true", "root", "123456", "jdbc:mysql://127.0.0.1:3306");
tEnv.registerCatalog("myCateLog", catalog);
tEnv.useCatalog("myCateLog");
tEnv.useDatabase("flink");
tEnv.sqlQuery(sql);
运行后抛异常
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The following SQL query could not be executed (jdbc:mysql://127.0.0.1:3306/): SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:187)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
at com.flink.examples.mysql.DataStreamSource.main(DataStreamSource.java:40)
通过异常源码处逻辑分析,是会通过SELECT TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_SCHEMA=? and TABLE_NAME=?语句,去mysql数据库中查询information_schema库TABLES表里有没有当前TABLE_SCHEMA和TABLE_NAME,但可以肯定的是对应的databaseName和tableName是正确;
实际查询语句没有问题,问题出现在baseUrl链接上,此链接表示dataSource数据库源url的前缀字符串,该url缺少databaseName库名,导致无法执行查询information_schema.TABLES
表SQL脚本。断点截图如下:
此段源代码来源于flink-connector-jdbc.jar包中的MySqlCatalog类tableExists()方法,3.1.0-1.17和3.1.1-1.17均测试过,都存在该问题;
通过代码阅读,以及查找网络资源,暂时未发现该问题的解决方案,如有其它人有遇到或解决,敬请指点!