至此y 2022-05-08 20:54 采纳率: 100%
浏览 21
已结题

想问一下这段代码有什么作用

public class FlinkSqlTemplateJob{

static {
    try {
        Class.forName("com.mysql.cj.jdbc.Driver");
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
}

public static void main(String[] args) throws Exception {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.enableCheckpointing(60 * 1000L);
    //每一分钟触发一次checkpoint保存状态
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    //checkpoint 语义设置为EXACTLY_ONCE,默认语义
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    //同一时间内只能允许有一个checkpoint

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

    //数据源
    String sourceTableSql = parameterTool.get("sourceTableSql");
    //目标表
    String targetTableSql = parameterTool.get("targetTableSql");
    String insertSql = parameterTool.get("insertSql");

    if (StringUtils.isBlank(sourceTableSql)
            || StringUtils.isBlank(targetTableSql) || StringUtils.isBlank(insertSql)) {
        return;
    }

    for (String sql : sourceTableSql.split(";")) {
        tableEnv.executeSql(sql);
    }


    tableEnv.executeSql(targetTableSql);

    tableEnv.executeSql(insertSql);
}

}

  • 写回答

2条回答

      报告相同问题?

      相关推荐 更多相似问题

      问题事件

      • 系统已结题 5月16日
      • 已采纳回答 5月8日
      • 创建了问题 5月8日

      悬赏问题

      • ¥30 nginx代理第三方接口
      • ¥20 求求了有没有人帮我看看怎么判断稳定性
      • ¥50 求解R语言的数据可视化问题
      • ¥15 两块fpga数据传输时钟同步问题
      • ¥15 pycharm 导入paddle模块后找不到enable_static属性
      • ¥15 wear os 哪种模式是只显示时间的,接口在哪。
      • ¥15 ckeditor 使用问题
      • ¥15 链表使用中遇到的问题,有愿意帮忙的嘛
      • ¥50 关于flowable工作流引擎的应用。
      • ¥20 python recv函数完整接收数据 问题