m0_37994675 2022-04-24 14:53 采纳率: 25%
浏览 628
已结题

flink sql job 如何每天定时执行?

问题遇到的现象和发生背景

img

我尝试 把 table 环境写在 java TimerTask 中
本地可以运行
但是提交flink时识别不到 job

问题相关代码,请勿粘贴截图
 public static void main(String[] args)throws Exception {




        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {

                 env = StreamExecutionEnvironment.getExecutionEnvironment();

                env.setParallelism(1);

                 build = EnvironmentSettings.newInstance()
                        .inStreamingMode()
                        .useBlinkPlanner()
                        .build();
                 tenv = StreamTableEnvironment.create(env, build);


                new KerberosAuth().kerberosAuth(false);
                HiveCatalog hive = null;
                try {
                    hive = getHiveCatalog.getHiveCatalog();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                tenv.registerCatalog("hive_big",hive);
                tenv.useCatalog("hive_big");

                Table table2 = tenv.sqlQuery("select catering_id, CAST( count(*) as BIGINT )  staff_num\n" +
                        "from  ops_safeter" +
                        "                      )" +
                        "  group by catering_id");

                tenv.createTemporaryView("hive_staff",table2);


                tenv.executeSql("CREATE TABLE  if not exists  dim_staff (" +
                        "  catering_id  STRING  ," +
                        "  staff_nu  BIGINT   , " +
                        "  PRIMARY KEY (catering_id) NOT ENFORCED " +
                        ") " +
                        "WITH (" +
                        //"'connector' = 'print'" +
                        "    'connector' = 'upsert-kafka',\n" +
                        "    'topic' = 'realtime.dim.staff_num',\n" +
                        "    'properties.zookeeper.connect' = '"+ KafkaConfig.ZOOKEEPER +"',\n" +
                        "    'properties.bootstrap.servers' = '"+ KafkaConfig.KAFKA_BROKER_LIST +"'  ,\n" +
                        //"    'connector.startup-mode' = 'latest-offset',\n" +
                        " 'key.format' = 'csv',  \n" +
                        " 'value.format' = 'csv')");

                tenv.executeSql("INSERT INTO  dim_staff  SELECT catering_id, staff_num  FROM hive_staff");


            }
        };

        Calendar calendar = Calendar.getInstance();
        int year = calendar.get(Calendar.YEAR);
        int month = calendar.get(Calendar.MONTH);
        int day = calendar.get(Calendar.DAY_OF_MONTH);
        long daySpan = 24 * 60 * 60 * 1000;
        //
        calendar.set(year, month, day, 3, 00, 00);
        Date time = calendar.getTime();
        long time1 = time.getTime();

        System.out.println(time);

        Timer timer = new Timer();
        timer.schedule(timerTask,time,24*60*60*1000);
        //timer.schedule(timerTask,time,30000);



    }


运行结果及报错内容

本地运行 没问题
但是提交

img

我的解答思路和尝试过的方法
我想要达到的结果
  • 写回答

2条回答 默认 最新

  • 吕布辕门 后端领域新星创作者 2022-04-24 15:04
    关注

    这样好像不行,你可以换一种思路。

    写linux脚本,定时提交作业

    Linux Crontab 定时任务

    麻烦采纳一下,你的采纳,是对我最好的鼓励,谢谢!

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 5月2日
  • 已采纳回答 4月24日
  • 创建了问题 4月24日

悬赏问题

  • ¥20 腾讯企业邮箱邮件可以恢复么
  • ¥15 有人知道怎么将自己的迁移策略布到edgecloudsim上使用吗?
  • ¥15 错误 LNK2001 无法解析的外部符号
  • ¥50 安装pyaudiokits失败
  • ¥15 计组这些题应该咋做呀
  • ¥60 更换迈创SOL6M4AE卡的时候,驱动要重新装才能使用,怎么解决?
  • ¥15 让node服务器有自动加载文件的功能
  • ¥15 jmeter脚本回放有的是对的有的是错的
  • ¥15 r语言蛋白组学相关问题
  • ¥15 Python时间序列如何拟合疏系数模型