swx7410852963 2022-08-11 17:52 采纳率: 100%
浏览 34
已结题

Flink使用时间语义+子查询动态传参报错

问题遇到的现象和发生背景

这几天使用Flink SQL客户端遇到一个窒息的问题,没查到解决方案,有没有懂Flink的朋友帮忙看一下
只要加 FOR SYSTEM_TIME AS OF 就给我报错,去掉就可以正常输出内容
下面建表只是模拟一下场景,如果就用子查询的方式来做的话,怎么才能不报错

Flink版本是1.15.0

问题相关代码,请勿粘贴截图

建表语句

CREATE TABLE table_a_cdc(
    sf_id String,
    name String,
    proc_time AS PROCTIME()
) WITH (
     'connector' = 'oracle-cdc',
    'hostname' = '127.0.0.1',
    'port' = '4122',
    'username' = 'username',
    'password' = 'password',
    'database-name' = 'xxx',
    'schema-name' = 'xxx',
    'table-name' = 'table_a',
    'scan.startup.mode' = 'initial',
    'debezium.log.mining.continuous.mine'='true',
    'debezium.log.mining.strategy'='online_catalog',
    'debezium.database.tablename.case.insensitive'='false'
);

CREATE TABLE table_b(
    sf_id String,
    age String
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:oracle:thin:@xxxx:1521:xxx',
    'driver' = 'oracle.jdbc.driver.OracleDriver',
    'table-name' = 'xxx.table_b',
    'username' = 'xxx',
    'password' = 'xxx'
);

CREATE TABLE table_c(
    sf_id String,
    sex String
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:oracle:thin:@xxxx:1521:xxx',
    'driver' = 'oracle.jdbc.driver.OracleDriver',
    'table-name' = 'xxx.table_c',
    'username' = 'xxx',
    'password' = 'xxx'
);

查询语句

SELECT
    a.name,
    b.age,
    (SELECT sex FROM table_c WHERE table_c.sf_id=b.sf_id) as sex
FROM table_a_cdc a
JOIN table_b FOR SYSTEM_TIME AS OF a.proc_time as b
ON b.sf_id=a.sf_id
运行结果及报错内容

img

报错输出:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: unexpected correlate variable $cor54 in the plan

$cor54 这个没错报错会+2 下次报错就是$cor56

  • 写回答

1条回答 默认 最新

  • 三千烦恼丝xzh 2022-08-12 09:51
    关注

    两个JDBC的维表加上主键定义PRIMARY KEY (sf_id) NOT ENFORCED

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录

报告相同问题?

问题事件

  • 系统已结题 8月20日
  • 已采纳回答 8月12日
  • 创建了问题 8月11日

悬赏问题

  • ¥60 许可证msc licensing软件报错显示已有相同版本软件,但是下一步显示无法读取日志目录。
  • ¥15 Attention is all you need 的代码运行
  • ¥15 一个服务器已经有一个系统了如果用usb再装一个系统,原来的系统会被覆盖掉吗
  • ¥15 使用esm_msa1_t12_100M_UR50S蛋白质语言模型进行零样本预测时,终端显示出了sequence handled的进度条,但是并不出结果就自动终止回到命令提示行了是怎么回事:
  • ¥15 前置放大电路与功率放大电路相连放大倍数出现问题
  • ¥30 关于<main>标签页面跳转的问题
  • ¥80 部署运行web自动化项目
  • ¥15 腾讯云如何建立同一个项目中物模型之间的联系
  • ¥30 VMware 云桌面水印如何添加
  • ¥15 用ns3仿真出5G核心网网元