问题遇到的现象和发生背景
你好, 我在使用flink connector starrocks实现flink写入StarRocks的时候遇到了一个问题, 代码的执行过程没有报错
用代码块功能插入代码,请勿粘贴截图
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class Flink_to_SR {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
class RowData {
public int score;
public String name;
public RowData(int score, String name) {
}
}
env.fromElements(
new RowData[]{
new RowData(99, "张三"),
new RowData(100, "李四")
}
).addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("score", DataTypes.INT())
.field("name", DataTypes.VARCHAR(20))
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://192.168.190.102:9030?characterEncoding=utf-8&useSSL=false")
.withProperty("load-url", "192.168.190.102:8030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "test3")
.withProperty("database-name", "flink_to_SR")
// 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
// .withProperty("sink.properties.partial_update", "true")
// .withProperty("sink.properties.columns", "k1,k2,k3")
// .withProperty("sink.properties.column_separator", "\\x01")
// .withProperty("sink.properties.row_delimiter", "\\x02")
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.score;
slots[1] = streamRowData.name;
}
)
);
env.execute();
}
}
运行结果及报错内容
查询插入结果时, SR中的数值异常, 请问这是什么原因导致的, 如何解决呢