你好按钮啊 2023-02-01 10:55 采纳率: 40%
浏览 101
已结题

FlinkSQL平台化,前端化管理

FlinkSql的前端化,平台化思路。

想搞一个平台。后端采用Java。实现类似于FlinkSQLClient的功能,即往后台Flink发送一连串的FlinkSQL命令,后台Flink可以直接执行并提交对应的任务。该平台也可以同步管理Flink已有任务。

FlinkSQL命令包含设置参数的命令,DQL,DML,DDL等等。如下所示。

-- 设置参数的语句
set execution.checkpointing.intervat = 10000;
set execution.checkpointingtimout = 10000;

--  DDL建表语句
CREATE TABLE doris_test_sink (
   id INT,
   name varchar(255),
   sex varchar(255)
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'srvbd59.net.cn:8030',
  'table.identifier' = 'cdc_test.user_info',
  'sink.batch.size' = '2',
  'sink.batch.interval'='1',
  'username' = 'root',
  'password' = '123456'
)

-- DML语句
insert into  ....

我的思路

img

1:上面的思路就是提前packet一个BaseJar,放到Linux上。
2:BaseJar中要执行的SQL语句是通过JDBC从数据库中拉取。通过for循环执行。
3:要执行的SQL语句是先通过编辑器编辑,然后存储在数据库中。
4:通过Java代码操作Linux,拼接flink run命令,并且在命令中拼接--SqlKey,通过ParameterTool.fromArgs获取作为键拉取要执行的SQL。
5:按照以上的思路,所有的任务都是基于一个BaseJar包运行的多个任务。
6:以上思路经初步验证是可行的。

该思路中的问题。

tableEnv.executeSql("sql语句")不能执行设置参数的语句,如下:

tableEnv.executeSql("set execution.checkpointing.intervat = 10000;")

但设置属性的参数不能通过tableEnv.executeSql()执行。会报下面的错误。

 Exception in thread "main" org.apache.flink.table.api.TableException: 
Unsupported SQL query! executeSql() only accepts a single SQL statement of type
 CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, 
DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, 
SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, 
DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.

关于上面的问题,初步考虑可以通过Java代码解析字符串,然后直接拼接成Configuration。然后再构建环境。

Configuration configuration = new Configuration();
// 设置底层 key-value 选项
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .inStreamingMode().withConfiguration(configuration).build();
TableEnvironment tEnv = TableEnvironment.create(settings);

类比其他的开源框架。

img

可以参考下一个开源框架Dinky,如果可以看源码的话。个人想寻求代码思路。

总结

我就是思考了一下FlinkSQL平台化的初步思路,但我这个思路总觉得不太正规。想知道Flink有没有提供这样一个API。给它要执行的SQL,他可以自动启动一个任务。或者觉得有其他更好的思路可以探讨一下。

最后

有没有平台化开发的相关经验的人员,后续我的工作要涉及到平台开发。希望可以提供指导。当然,如果提供实质帮助。我也可以有偿。

  • 写回答

4条回答 默认 最新

  • 白夜鸦羽 2023-02-02 19:13
    关注

    题主的思路完全正确可行,因为我就是这么做的。也是一开始觉得不是很正规,但确实能解决实际的业务问题,要相信自己。


    梳理一下
    BaseJar:丢到linux服务器上,用来解析Flink SQL语句
    MySQL:存储要执行的Flink SQL语句
    Platform:调用BaseJar,传入参数task_id,告诉BaseJar要读取MySQL里的哪一条Flink SQL进行解析,并生成一个Flink SQL任务。


    这里有3个细节
    1.BaseJar的入口类的Main函数,支持传入参数,这个参数就可以定为task_id
    2.建议重新设计MySQL表,既然一行MySQL数据存储一个Flink SQL任务,那么至少要有三个字段
    data_source:Flink SQL源表,指定从哪里接入数据,一般是Kafka
    data_sink:Flink SQL落地表,指定任务结果写到哪里,一般是Kafka
    task_sql:Flink SQL逻辑代码
    这样一行MySQL数据就能生成一个完整的Flink SQL任务,且可以根据task_id来获取到这个任务配置解析
    3.拼接flink run命令调用BaseJar,但属于本地命令执行,要依赖本地环境,更好的办法是远程提交Flink任务。但远程提交Flink任务现在没有现成的代码,需要自己去撸Flink源码。


    回到问题本身:tableEnv.executeSql("sql语句")不能执行设置参数的语句。
    当然不能,因为Flink流任务在一开始必须是确定的,但我们可以通过向Main方法传参task_id,来实现一个BaseJar读取不同的Flink SQL语句,生成不同的Flink SQL任务。本身已经想到了用MySQL存Flink SQL,就不要再用Configuration了。


    补充
    现在已经有很多这样的开源平台了,如果觉得自己造轮子麻烦,可以直接用开源的。
    StreamPark:刚刚被Apache收纳为大数据平台孵化项目,https://github.com/apache/incubator-streampark
    Ververica Platform:Flink官方提供的Flink SQL执行平台,还行吧勉强用英文不友好,https://www.ververica.com/getting-started

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(3条)

报告相同问题?

问题事件

  • 系统已结题 2月15日
  • 已采纳回答 2月7日
  • 创建了问题 2月1日

悬赏问题

  • ¥15 R语言爬虫的时候元素和园代码不一样怎么解决呀
  • ¥15 VS2022多项目启动有问题
  • ¥15 SQL删除添加数据后序号不连续问题。
  • ¥15 首次运行OmniEvent运行报错
  • ¥15 有没有人知道这个问题怎么解决
  • ¥15 comsol电力电缆载流量仿真
  • ¥15 webSocket可以接TCP socket接口吗
  • ¥60 mpi并行出错,CFD++计算
  • ¥15 c#:vsto,powerpoint的外接程序中换主题颜色
  • ¥15 状态机/汽车转向灯/Sateflow