qq_45811827 2022-11-16 17:18 采纳率: 100%
浏览 27
已结题

flink写入StarRocks后,

问题遇到的现象和发生背景

你好, 我在使用flink connector starrocks实现flink写入StarRocks的时候遇到了一个问题, 代码的执行过程没有报错

用代码块功能插入代码,请勿粘贴截图
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;

public class Flink_to_SR {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        class RowData {
            public int score;
            public String name;
            public RowData(int score, String name) {
            }
        }
        env.fromElements(
                new RowData[]{
                        new RowData(99, "张三"),
                        new RowData(100, "李四")
                }
        ).addSink(
                StarRocksSink.sink(
                        // the table structure
                        TableSchema.builder()
                                .field("score", DataTypes.INT())
                                .field("name", DataTypes.VARCHAR(20))
                                .build(),
                        // the sink options
                        StarRocksSinkOptions.builder()
                                .withProperty("jdbc-url", "jdbc:mysql://192.168.190.102:9030?characterEncoding=utf-8&useSSL=false")
                                .withProperty("load-url", "192.168.190.102:8030")
                                .withProperty("username", "root")
                                .withProperty("password", "")
                                .withProperty("table-name", "test3")
                                .withProperty("database-name", "flink_to_SR")
                                // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
                                // .withProperty("sink.properties.partial_update", "true")
                                // .withProperty("sink.properties.columns", "k1,k2,k3")
//                                 .withProperty("sink.properties.column_separator", "\\x01")
//                                 .withProperty("sink.properties.row_delimiter", "\\x02")
                                .build(),
                        // set the slots with streamRowData
                        (slots, streamRowData) -> {
                            slots[0] = streamRowData.score;
                            slots[1] = streamRowData.name;
                        }
                )
        );
        env.execute();
    }
}
运行结果及报错内容

查询插入结果时, SR中的数值异常, 请问这是什么原因导致的, 如何解决呢

img

  • 写回答

1条回答 默认 最新

  • 三千烦恼丝xzh 2022-11-16 17:32
    关注

    你没发现你RowData的构造函数里面没赋值吗😓

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月25日
  • 已采纳回答 11月17日
  • 创建了问题 11月16日

悬赏问题

  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度
  • ¥30 关于#r语言#的问题:如何对R语言中mfgarch包中构建的garch-midas模型进行样本内长期波动率预测和样本外长期波动率预测
  • ¥15 ETLCloud 处理json多层级问题
  • ¥15 matlab中使用gurobi时报错
  • ¥15 这个主板怎么能扩出一两个sata口
  • ¥15 不是,这到底错哪儿了😭
  • ¥15 2020长安杯与连接网探
  • ¥15 关于#matlab#的问题:在模糊控制器中选出线路信息,在simulink中根据线路信息生成速度时间目标曲线(初速度为20m/s,15秒后减为0的速度时间图像)我想问线路信息是什么