laijunlin_data 2023-07-28 10:59 采纳率: 58.3%
浏览 27
已结题

flink的starrockssink,怎么从数据流中获取tableName,根据数据流不同的tableName写入不同starrocks表中

【详述】flink的starrockssink,怎么从数据流中获取tableName,根据数据流不同的tableName写入不同starrocks表中
【flink的starrockssink】

StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, Constant.STARROCKS_JDBC_URL)
.withProperty(“load-url”, Constant.STARROCKS_LOAD_URL)
.withProperty(“username”, username)
.withProperty(“password”, password)
.withProperty(“database-name”, databaseName)
.withProperty(“table-name”, tableName)
.withProperty(“sink.buffer-flush.interval-ms”, “10000”)
.withProperty(“sink.buffer-flush.max-bytes”, 1024102464 + “”)
.withProperty(“sink.max-retries”, “5”)
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.build()
);

这里的tableName只能传入常量,不能通过数据流不同的数据获取不同的tableName
【flink的kafkaSink】

Properties props = new Properties();
props.setProperty(“bootstrap.servers”, Constant.KAFKA_BOOTSTRAP_SERVERS);
props.setProperty(“security.protocol”,Constant.KAFKA_SECURITY_PROTOCOL);
props.setProperty(“sasl.mechanism”, Constant.KAFKA_SASL_MECHANISM);
props.setProperty(“sasl.jaas.config”, Constant.KAFKA_SASL_JAAS_CONFIG);
props.setProperty(“transaction.timeout.ms”, Constant.KAFKA_TRANSACTION_TIMEOUT_MS);
return new FlinkKafkaProducer(
“default”,
new KafkaSerializationSchema() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element,
@Nullable Long timestamp) {
JSONObject obj = JSON.parseObject(element);
JSONObject source = obj.getJSONObject(“source”);
String topic = source.getString(“db”) + “.” + source.getString(“table”);
return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8));
}
},
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);

而kafka的topic却可以从数据流不同的数据获取不同的topic,写入不同的topic
【想要实现的效果】和kafkasink类似,能从数据流不同的数据获取不同的tableName,写入starrocks不同的表中,并且从攒批数量或者时间触发写出sr

  • 写回答

3条回答 默认 最新

  • Mr.Guoguo 2023-07-31 09:51
    关注

    两个思路,一个是分流,根据不同的tablename用不同的sink;另一个是重写starrocksink,不使用默认的,在里面允许指定tablename

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 8月9日
  • 创建了问题 7月28日

悬赏问题

  • ¥15 下载ctorch报错,求解
  • ¥15 如何入门学习c语言,单片机
  • ¥15 idea 编辑语言的选择
  • ¥15 Windows下部署Asmjit
  • ¥15 请问双层规划模型的上下层目标函数不一致,是如何保证迭代收敛性的
  • ¥15 微信小程序 前端页面内容搜索
  • ¥15 cpu是如何判断当前指令已经执行完毕,然后去执行下条指令的
  • ¥15 安装visual studio2022时visualstudiosetup启动不了,闪退。问题代号0x0和0x1389
  • ¥30 java spring boot2.5.3版本websocket连不上
  • ¥15 angular js调外部链接查看pdf