问题遇到的现象和发生背景
我尝试 把 table 环境写在 java TimerTask 中
本地可以运行
但是提交flink时识别不到 job
问题相关代码,请勿粘贴截图
public static void main(String[] args)throws Exception {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
build = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
tenv = StreamTableEnvironment.create(env, build);
new KerberosAuth().kerberosAuth(false);
HiveCatalog hive = null;
try {
hive = getHiveCatalog.getHiveCatalog();
} catch (Exception e) {
e.printStackTrace();
}
tenv.registerCatalog("hive_big",hive);
tenv.useCatalog("hive_big");
Table table2 = tenv.sqlQuery("select catering_id, CAST( count(*) as BIGINT ) staff_num\n" +
"from ops_safeter" +
" )" +
" group by catering_id");
tenv.createTemporaryView("hive_staff",table2);
tenv.executeSql("CREATE TABLE if not exists dim_staff (" +
" catering_id STRING ," +
" staff_nu BIGINT , " +
" PRIMARY KEY (catering_id) NOT ENFORCED " +
") " +
"WITH (" +
//"'connector' = 'print'" +
" 'connector' = 'upsert-kafka',\n" +
" 'topic' = 'realtime.dim.staff_num',\n" +
" 'properties.zookeeper.connect' = '"+ KafkaConfig.ZOOKEEPER +"',\n" +
" 'properties.bootstrap.servers' = '"+ KafkaConfig.KAFKA_BROKER_LIST +"' ,\n" +
//" 'connector.startup-mode' = 'latest-offset',\n" +
" 'key.format' = 'csv', \n" +
" 'value.format' = 'csv')");
tenv.executeSql("INSERT INTO dim_staff SELECT catering_id, staff_num FROM hive_staff");
}
};
Calendar calendar = Calendar.getInstance();
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH);
int day = calendar.get(Calendar.DAY_OF_MONTH);
long daySpan = 24 * 60 * 60 * 1000;
//
calendar.set(year, month, day, 3, 00, 00);
Date time = calendar.getTime();
long time1 = time.getTime();
System.out.println(time);
Timer timer = new Timer();
timer.schedule(timerTask,time,24*60*60*1000);
//timer.schedule(timerTask,time,30000);
}
运行结果及报错内容
本地运行 没问题
但是提交