FlinkSql的前端化,平台化思路。
想搞一个平台。后端采用Java。实现类似于FlinkSQLClient的功能,即往后台Flink发送一连串的FlinkSQL命令,后台Flink可以直接执行并提交对应的任务。该平台也可以同步管理Flink已有任务。
FlinkSQL命令包含设置参数的命令,DQL,DML,DDL等等。如下所示。
-- 设置参数的语句
set execution.checkpointing.intervat = 10000;
set execution.checkpointingtimout = 10000;
-- DDL建表语句
CREATE TABLE doris_test_sink (
id INT,
name varchar(255),
sex varchar(255)
)
WITH (
'connector' = 'doris',
'fenodes' = 'srvbd59.net.cn:8030',
'table.identifier' = 'cdc_test.user_info',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = '123456'
)
-- DML语句
insert into ....
我的思路
1:上面的思路就是提前packet一个BaseJar,放到Linux上。
2:BaseJar中要执行的SQL语句是通过JDBC从数据库中拉取。通过for循环执行。
3:要执行的SQL语句是先通过编辑器编辑,然后存储在数据库中。
4:通过Java代码操作Linux,拼接flink run命令,并且在命令中拼接--SqlKey,通过ParameterTool.fromArgs获取作为键拉取要执行的SQL。
5:按照以上的思路,所有的任务都是基于一个BaseJar包运行的多个任务。
6:以上思路经初步验证是可行的。
该思路中的问题。
tableEnv.executeSql("sql语句")不能执行设置参数的语句,如下:
tableEnv.executeSql("set execution.checkpointing.intervat = 10000;")
但设置属性的参数不能通过tableEnv.executeSql()执行。会报下面的错误。
Exception in thread "main" org.apache.flink.table.api.TableException:
Unsupported SQL query! executeSql() only accepts a single SQL statement of type
CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION,
DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE,
SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW,
DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.
关于上面的问题,初步考虑可以通过Java代码解析字符串,然后直接拼接成Configuration。然后再构建环境。
Configuration configuration = new Configuration();
// 设置底层 key-value 选项
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode().withConfiguration(configuration).build();
TableEnvironment tEnv = TableEnvironment.create(settings);
类比其他的开源框架。
可以参考下一个开源框架Dinky,如果可以看源码的话。个人想寻求代码思路。
总结
我就是思考了一下FlinkSQL平台化的初步思路,但我这个思路总觉得不太正规。想知道Flink有没有提供这样一个API。给它要执行的SQL,他可以自动启动一个任务。或者觉得有其他更好的思路可以探讨一下。
最后
有没有平台化开发的相关经验的人员,后续我的工作要涉及到平台开发。希望可以提供指导。当然,如果提供实质帮助。我也可以有偿。