问题遇到的现象和发生背景
我用Flink
、Kafka
、clickhouse
组建了一个系统。我想测试数据流在这三个组件之间的时延。我在消息输出flink的时候打了个时间戳,输入kafka会有自带的CreateTime、clickhouse有now()方法。这样我就可以通过这个时间戳来计算消息从上一个组件到下一个组件的延时了。
代码
CREATE TABLE price_queue(
CreateTime Int64,
nowTime Int64,
timestamp Int64,
ticker String,
price Float32
) ENGINE = kafka(localhost, input-topic, 'default', 'csv')
CREATE TABLE price(
CreateTime Int64,
nowTime Int64,
timestamp DateTime(3, 'Asia/Shanghai'),
ticker String,
price Float32
) ENGINE = MergeTree()
ORDER BY timestamp;
CREATE MATERIALIZED VIEW price_consumer TO price(
CreateTime Int64,
nowTime Int64,
timestamp DateTime(3, 'Asia/Shanghai'),
ticker String,
price Float32
) AS
SELECT
CreateTime,
toInt64(now()) AS nowTime,
fromUnixTimeStamp64Milli(timestamp, 'Asia/Shanghai') AS timestamp,
ticker,
price
FROM price_queue;
运行结果及报错内容
表内没有数据,select count(*) from price为0
我的解答思路和尝试过的方法
我建表的时候尝试直接增加CreateTime和nowTime,但是似乎并不奏效。
我想要达到的结果
请问怎么样将Kafka的CreateTime
在clickhouse中获取到?然后clickhou能增加一列当前时间戳呢?