问题遇到的现象和发生背景
这几天使用Flink SQL客户端遇到一个窒息的问题,没查到解决方案,有没有懂Flink的朋友帮忙看一下
只要加 FOR SYSTEM_TIME AS OF 就给我报错,去掉就可以正常输出内容
下面建表只是模拟一下场景,如果就用子查询的方式来做的话,怎么才能不报错
Flink版本是1.15.0
问题相关代码,请勿粘贴截图
建表语句
CREATE TABLE table_a_cdc(
sf_id String,
name String,
proc_time AS PROCTIME()
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '127.0.0.1',
'port' = '4122',
'username' = 'username',
'password' = 'password',
'database-name' = 'xxx',
'schema-name' = 'xxx',
'table-name' = 'table_a',
'scan.startup.mode' = 'initial',
'debezium.log.mining.continuous.mine'='true',
'debezium.log.mining.strategy'='online_catalog',
'debezium.database.tablename.case.insensitive'='false'
);
CREATE TABLE table_b(
sf_id String,
age String
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@xxxx:1521:xxx',
'driver' = 'oracle.jdbc.driver.OracleDriver',
'table-name' = 'xxx.table_b',
'username' = 'xxx',
'password' = 'xxx'
);
CREATE TABLE table_c(
sf_id String,
sex String
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@xxxx:1521:xxx',
'driver' = 'oracle.jdbc.driver.OracleDriver',
'table-name' = 'xxx.table_c',
'username' = 'xxx',
'password' = 'xxx'
);
查询语句
SELECT
a.name,
b.age,
(SELECT sex FROM table_c WHERE table_c.sf_id=b.sf_id) as sex
FROM table_a_cdc a
JOIN table_b FOR SYSTEM_TIME AS OF a.proc_time as b
ON b.sf_id=a.sf_id
运行结果及报错内容
报错输出:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: unexpected correlate variable $cor54 in the plan
$cor54 这个没错报错会+2 下次报错就是$cor56