public class FlinkSqlTemplateJob{
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60 * 1000L);
//每一分钟触发一次checkpoint保存状态
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//checkpoint 语义设置为EXACTLY_ONCE,默认语义
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//同一时间内只能允许有一个checkpoint
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
//数据源
String sourceTableSql = parameterTool.get("sourceTableSql");
//目标表
String targetTableSql = parameterTool.get("targetTableSql");
String insertSql = parameterTool.get("insertSql");
if (StringUtils.isBlank(sourceTableSql)
|| StringUtils.isBlank(targetTableSql) || StringUtils.isBlank(insertSql)) {
return;
}
for (String sql : sourceTableSql.split(";")) {
tableEnv.executeSql(sql);
}
tableEnv.executeSql(targetTableSql);
tableEnv.executeSql(insertSql);
}
}