laijunlin_data 2023-07-28 10:59 采纳率: 80%
浏览 42
已结题

flink的starrockssink,怎么从数据流中获取tableName,根据数据流不同的tableName写入不同starrocks表中

【详述】flink的starrockssink,怎么从数据流中获取tableName,根据数据流不同的tableName写入不同starrocks表中
【flink的starrockssink】

StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, Constant.STARROCKS_JDBC_URL)
.withProperty(“load-url”, Constant.STARROCKS_LOAD_URL)
.withProperty(“username”, username)
.withProperty(“password”, password)
.withProperty(“database-name”, databaseName)
.withProperty(“table-name”, tableName)
.withProperty(“sink.buffer-flush.interval-ms”, “10000”)
.withProperty(“sink.buffer-flush.max-bytes”, 1024102464 + “”)
.withProperty(“sink.max-retries”, “5”)
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.build()
);

这里的tableName只能传入常量,不能通过数据流不同的数据获取不同的tableName
【flink的kafkaSink】

Properties props = new Properties();
props.setProperty(“bootstrap.servers”, Constant.KAFKA_BOOTSTRAP_SERVERS);
props.setProperty(“security.protocol”,Constant.KAFKA_SECURITY_PROTOCOL);
props.setProperty(“sasl.mechanism”, Constant.KAFKA_SASL_MECHANISM);
props.setProperty(“sasl.jaas.config”, Constant.KAFKA_SASL_JAAS_CONFIG);
props.setProperty(“transaction.timeout.ms”, Constant.KAFKA_TRANSACTION_TIMEOUT_MS);
return new FlinkKafkaProducer(
“default”,
new KafkaSerializationSchema() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element,
@Nullable Long timestamp) {
JSONObject obj = JSON.parseObject(element);
JSONObject source = obj.getJSONObject(“source”);
String topic = source.getString(“db”) + “.” + source.getString(“table”);
return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8));
}
},
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);

而kafka的topic却可以从数据流不同的数据获取不同的topic,写入不同的topic
【想要实现的效果】和kafkasink类似,能从数据流不同的数据获取不同的tableName,写入starrocks不同的表中,并且从攒批数量或者时间触发写出sr

  • 写回答

3条回答 默认 最新

  • Mr.Guoguo 2023-07-31 09:51
    关注

    两个思路,一个是分流,根据不同的tablename用不同的sink;另一个是重写starrocksink,不使用默认的,在里面允许指定tablename

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 8月9日
  • 创建了问题 7月28日