weixin_43716055 2022-11-15 10:36 采纳率: 50%
浏览 5
已结题

如何在clickhouse中获取kafka的CreateTime,并新建一列当前时间戳

问题遇到的现象和发生背景
我用FlinkKafkaclickhouse组建了一个系统。我想测试数据流在这三个组件之间的时延。我在消息输出flink的时候打了个时间戳,输入kafka会有自带的CreateTime、clickhouse有now()方法。这样我就可以通过这个时间戳来计算消息从上一个组件到下一个组件的延时了。

代码

CREATE TABLE price_queue(
  CreateTime Int64,
  nowTime Int64,
  timestamp Int64,
  ticker String,
  price Float32
) ENGINE = kafka(localhost, input-topic, 'default', 'csv')

CREATE TABLE price(
   CreateTime Int64,
    nowTime Int64,
    timestamp DateTime(3, 'Asia/Shanghai'),
    ticker String,
    price Float32
) ENGINE = MergeTree()
ORDER BY timestamp;

CREATE MATERIALIZED VIEW price_consumer TO price(
    CreateTime Int64,
    nowTime Int64,
    timestamp DateTime(3, 'Asia/Shanghai'),
    ticker String,
    price Float32
) AS
SELECT
 CreateTime,
 toInt64(now()) AS nowTime,
 fromUnixTimeStamp64Milli(timestamp, 'Asia/Shanghai') AS timestamp,
 ticker,
 price
FROM price_queue;

运行结果及报错内容
表内没有数据,select count(*) from price为0

我的解答思路和尝试过的方法
我建表的时候尝试直接增加CreateTime和nowTime,但是似乎并不奏效。

我想要达到的结果
请问怎么样将Kafka的CreateTime在clickhouse中获取到?然后clickhou能增加一列当前时间戳呢?

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 11月23日
    • 创建了问题 11月15日

    悬赏问题

    • ¥100 如何寻找到黑客帮助,愿意付丰厚的酬劳
    • ¥15 java代码写在记事本上后在cmd上运行时无报错但又没生成文件
    • ¥15 关于#python#的问题:在跑ldsc数据整理的时候一直抱这种错误,要么--out识别不了参数,要么--merge-alleles识别不了参数(操作系统-linux)
    • ¥15 PPOCRLabel
    • ¥15 混合键合键合机对准标识
    • ¥100 现在不懂的是如何将当前的相机中的照片,作为纹理贴图,映射到扫描出的模型上
    • ¥15 魔霸ROG7 pro,win11.息屏后会显示黑屏,如图,如何解决?(关键词-重新启动)
    • ¥15 有没有人知道这是哪里出了问题啊?要怎么改呀?
    • ¥200 C++表格文件处理-悬赏
    • ¥15 Windows Server2016本地登录失败