m0_37994675 2022-04-24 14:53 采纳率: 25%
浏览 288
已结题

flink sql job 如何每天定时执行?

问题遇到的现象和发生背景

img

我尝试 把 table 环境写在 java TimerTask 中
本地可以运行
但是提交flink时识别不到 job

问题相关代码,请勿粘贴截图
 public static void main(String[] args)throws Exception {




        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {

                 env = StreamExecutionEnvironment.getExecutionEnvironment();

                env.setParallelism(1);

                 build = EnvironmentSettings.newInstance()
                        .inStreamingMode()
                        .useBlinkPlanner()
                        .build();
                 tenv = StreamTableEnvironment.create(env, build);


                new KerberosAuth().kerberosAuth(false);
                HiveCatalog hive = null;
                try {
                    hive = getHiveCatalog.getHiveCatalog();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                tenv.registerCatalog("hive_big",hive);
                tenv.useCatalog("hive_big");

                Table table2 = tenv.sqlQuery("select catering_id, CAST( count(*) as BIGINT )  staff_num\n" +
                        "from  ops_safeter" +
                        "                      )" +
                        "  group by catering_id");

                tenv.createTemporaryView("hive_staff",table2);


                tenv.executeSql("CREATE TABLE  if not exists  dim_staff (" +
                        "  catering_id  STRING  ," +
                        "  staff_nu  BIGINT   , " +
                        "  PRIMARY KEY (catering_id) NOT ENFORCED " +
                        ") " +
                        "WITH (" +
                        //"'connector' = 'print'" +
                        "    'connector' = 'upsert-kafka',\n" +
                        "    'topic' = 'realtime.dim.staff_num',\n" +
                        "    'properties.zookeeper.connect' = '"+ KafkaConfig.ZOOKEEPER +"',\n" +
                        "    'properties.bootstrap.servers' = '"+ KafkaConfig.KAFKA_BROKER_LIST +"'  ,\n" +
                        //"    'connector.startup-mode' = 'latest-offset',\n" +
                        " 'key.format' = 'csv',  \n" +
                        " 'value.format' = 'csv')");

                tenv.executeSql("INSERT INTO  dim_staff  SELECT catering_id, staff_num  FROM hive_staff");


            }
        };

        Calendar calendar = Calendar.getInstance();
        int year = calendar.get(Calendar.YEAR);
        int month = calendar.get(Calendar.MONTH);
        int day = calendar.get(Calendar.DAY_OF_MONTH);
        long daySpan = 24 * 60 * 60 * 1000;
        //
        calendar.set(year, month, day, 3, 00, 00);
        Date time = calendar.getTime();
        long time1 = time.getTime();

        System.out.println(time);

        Timer timer = new Timer();
        timer.schedule(timerTask,time,24*60*60*1000);
        //timer.schedule(timerTask,time,30000);



    }


运行结果及报错内容

本地运行 没问题
但是提交

img

我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

2条回答

      报告相同问题?

      相关推荐 更多相似问题

      问题事件

      • 系统已结题 5月2日
      • 已采纳回答 4月24日
      • 创建了问题 4月24日

      悬赏问题

      • ¥30 关于#网络安全#的问题:非对称加密验证
      • ¥20 关于线性代数里施密特正交化和QR分解的疑问
      • ¥15 matlab超类包含解析错误
      • ¥15 python拖拽文件问题
      • ¥15 执行import paddle代码出现错误如何解决?
      • ¥15 hisat2align exited with value 137
      • ¥15 寻找大学生合作开发软件(Delphi)
      • ¥30 AndroidBench&eMMC内存测试速度&Android
      • ¥15 W10 文件共享失败 怎么解决
      • ¥20 b站私信完整导出的方法