laijunlin_data 2024-08-06 15:45 采纳率: 58.3%
浏览 10

flinkcdc监控sqlserver,数据库的表中该字段是空值,而cdc中该字段的值是'N,不一致

版本

1.flinkcdc

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>3.0.0</version>
        </dependency>

  1. sqlserver的版本: 2019

    遇到的问题

    sqlserver查询BOSS_UNDER_PROCESSING的值是空值,
    但是flinkcdc获得的数据变成 "BOSS_UNDER_PROCESSING": "'N",
    该字段在sqlserver的设置是:BOSS_UNDER_PROCESSING nchar(1) DEFAULT N'N' NULL,

    img

{
  "before": null,
  "after": {
    "ORIGREC": 338,
    "APPRDATE": null,
    "APPRDISP": null,
    "APPROVEDBY": null,
    "APPRSTS": null,
    "AUTOAPPRVTEST": null,
    "AUTORELEASE": null,
    "BATCHNO": null,
    "CASENUMBER": null,
    "CHARGENO": null,
    "COCNO": null,
    "COLLDATE": null,
    "COMMENTS": null,
    "CREATEDBY": null,
    "DATEPRODAV": null,
    "DATERECV": null,
    "DEPT": "Changzhou",
    "DESIREDTAT": null,
    "DISPSTS": null,
    "DRAWDATE": null,
    "DRAWNBY": null,
    "DUEDATE": null,
    "EXTERNAL_ID": null,
    "FLDSTS": "Draft",
    "FLOWNAME": null,
    "FOLDERFLAG": null,
    "FOLDERNO": "37B10AAE-F",
    "FSTEPCODE": null,
    "GUID": null,
    "INVENTORYID": null,
    "INVESTIGATIONCODE": null,
    "INVOICENUMBER": null,
    "INVSTIGLAYOUTCODE": null,
    "LABDUEDATE": null,
    "LOGDATE": null,
    "MANAGER": null,
    "MATCODE": null,
    "NOOFRETAINS": null,
    "NOTES": null,
    "ORIGSTS": "N",
    "PREPRUNFOLDER": null,
    "PRODGROUP": null,
    "PROGRAMCODE": null,
    "PROJECTNO": null,
    "RASCLIENTID": null,
    "RASNO": null,
    "RASPROJECTNO": null,
    "RECEIVEDBY": null,
    "RESAMPNO": null,
    "RUNFOLDER": null,
    "SP_CODE": null,
    "SPECNO": null,
    "STABNO": null,
    "STUDYTYPE": null,
    "SUBMITTER": null,
    "TAGCOMMENTS": null,
    "WFSAMPLETYPE": null,
    "WORKFLOWCODE": null,
    "BATCHID": null,
    "SUBMITTINGORG": null,
    "FOLDERNAME": null,
    "METADATA_GUID": null,
    "EMFOLDERNO": null,
    "BILL_RASCLIENTID": null,
    "BILL_RASPROJECTNO": null,
    "DISP_STUDYNO": null,
    "SAMPLE_POINT_O": null,
    "ORIGREC_ARC": null,
    "P_BATCH_O": null,
    "PRELOG_DT": null,
    "SCHED_OCCURRENCE_O": null,
    "F_AREA_NAME": null,
    "F_PLANT": null,
    "F_PROCESS_SAMPLE_TYPE": null,
    "VESSEL_O": null,
    "F_SPECIAL_INSTRUCTIONS": null,
    "NEED_PRELIMINARY": "N",
    "REP_LANG": null,
    "SHIP_TO_CLIENTID": null,
    "SUPPLIER_CLIENTID": null,
    "MFG_CLIENTID": null,
    "BUYER_CLIENTID": null,
    "AGENT_CLIENTID": null,
    "RETURN_ADDRESSNO": null,
    "PO_NUMBER": null,
    "ORDER_TEMPLATE_CODE": null,
    "SERVICE_LEVEL": null,
    "SAMPLE_PRODUCTION_STAGE": null,
    "SALES_REP_ID": null,
    "BOSS_HEADER_ID": null,
    "BOSS_LINE_ID": null,
    "BOSS_ORDER_STATUS": null,
    "BOSS_LINE_STATUS": null,
    "SOURCE": "STARLIMS",
    "ORGID": null,
    "COUNTRY_ORIGIN": null,
    "COUNTRY_DESTINATION": null,
    "SHIPTO_ADDRESSNO": null,
    "BILLTO_ADDRESSNO": null,
    "BOSS_TRANSACTION_ID": null,
    "BOSS_TRANSACTION_NO": null,
    "BOSS_TRANSACTION_DATE": null,
    "BUYER_ADDRESSNO": null,
    "BOSS_BATCH_SOURCE": null,
    "MATTYPE": null,
    "RET_ADDRESS_OPT": null,
    "APPLICANT_ADDRESSNO": null,
    "SUPP_ADDRESSNO": null,
    "MFG_ADDRESSNO": null,
    "AGENT_ADDRESSNO": null,
    "RET_ADDRESS_INSTRC": null,
    "FOLDERPRICE": null,
    "PRICELISTID": null,
    "BILL_PRICELISTID": null,
    "BUYER_RASPROJECTNO": null,
    "BUYER_PRICELISTID": null,
    "AGENT_RASPROJECTNO": null,
    "AGENT_PRICELISTID": null,
    "BOSS_ORDER_NUMBER": null,
    "INDUSTRY_SEGMENT": null,
    "TEST_DESCRIPTION": null,
    "TEST_METHOD": null,
    "METHOD_YEAR": null,
    "REFERENCE_ID": null,
    "SAMPLE_DESCRIPTION": null,
    "ON_HOLD": "N",
    "SGS_BILLING_NOTES": null,
    "SALES_AFFILIATE_ID": null,
    "ASSIGNED_TO": null,
    "SERVICE_LEVEL_ID": null,
    "CLIENTNOTES": null,
    "EXTERNAL_PARENT_ID": null,
    "EXTERNAL_CREATEDBY": null,
    "UNCERTAINTY_ID": null,
    "CREATE_BOSS_ORDER": "Y",
    "TEMPLATE_ID": null,
    "STA_LANGID": null,
    "FIRST_REPORTNO": null,
    "DUEDATE_MODE": "Forward",
    "DUEDATE_OVERRIDE": "N",
    "CONFORMANCE_STATEMENT": null,
    "SGS_WITNESS": null,
    "LAB_NOTES": null,
    "REPORT_NOTES": null,
    "SPLIT_LANGID": null,
    "SPLIT_COMMENTS": null,
    "SPLIT_TYPE": null,
    "SPLIT_LANGNAME": null,
    "QRCODE_REPORT": null,
    "SEND_REPORT_FORMAT": "\u0027PDF",
    "SELF_REFERENCE": "\u0027N",
    "COMMITTEDBY": null,
    "FLD_RPT_STS": null,
    "NEED_FINAL_REPORT": null,
    "BOSS_LAST_REQ_STATUS": null,
**    "BOSS_UNDER_PROCESSING": "\u0027N",**
    "OUTSOURCEFROM": null,
    "LAST_UPDATED": null,
    "RPT_STS_UPTDATE": null,
    "SGS_CLIENT_INFO_UPDATED": null,
    "LAST_ACTIVATION_DATE": null,
    "BOSS_LAST_REQ_DATE": null,
    "BOSS_LAST_REQ_ERROR": null,
    "PRICE_REVIEW_STATUS": "\u0027N/A",
    "SGS_PO_DATE": null,
    "SGS_INVOICE_TMP": null,
    "SGS_PEST_EVAL_STATUS": null,
    "SGS_CREATED_FROM": null,
    "RPT_SAMP_TYPE": "\u0027All",
    "SGSMARTTRFNO": null,
    "CLIENT_IF_INT": null
  },
  "source": {
    "version": "1.9.7.Final",
    "connector": "sqlserver",
    "name": "sqlserver_transaction_log_source",
    "ts_ms": 1722929207790,
    "snapshot": "last",
    "db": "loya-demo",
    "sequence": null,
    "schema": "dbo",
    "table": "FOLDERS",
    "change_lsn": null,
    "commit_lsn": "00000052:000022d0:0002",
    "event_serial_no": null
  },
  "op": "r",
  "ts_ms": 1722929208600,
  "transaction": null
}

#问题相关的代码

Properties mysqlProperties = new Properties();
    mysqlProperties.setProperty("converters", "dateConverters");
    mysqlProperties.setProperty("dateConverters.type", "com.sgs.bigdata.utils.MySqlDateTimeConverter");
    Map config = new HashMap();
    config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
    JsonDebeziumDeserializationSchema jdd = new JsonDebeziumDeserializationSchema(false, config);
 // 数据表
    SourceFunction<String> sourceFunction = SqlServerSource.<String>builder()
      .hostname(hostname)
      .port(port)
      .database(database) // monitor sqlserver database
      .tableList(tableName) // monitor products table
      .username(username)
      .password(password)
      .startupOptions(StartupOptions.initial())
      .deserializer(jdd)
      .debeziumProperties(mysqlProperties)
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
    dataStreamSource.print("<<<");

不知道怎么解决?

  • 写回答

1条回答 默认 最新

  • 一直免费一直爽 2024-08-11 18:42
    关注

    以下回复参考:皆我百晓生券券喵儿等免费微信小程序作答:

    要解决这个问题,首先确保您已经正确配置了Flink Connector SQL Server CDC插件,并且数据库连接信息也正确。然后,您可以尝试以下步骤:

    1. 检查数据一致性:首先确认数据库中的BOSS_UNDER_PROCESSING字段确实为空值,因为CDC会根据这个值更新流中的数据。

    2. 查看字段类型:确保BOSS_UNDER_PROCESSING字段在数据库端确实是nchar(1)类型的。如果不是,则需要调整字段类型以匹配CDC的要求。

    3. 修改CDC配置

      • 在您的Flink项目中,找到与flink-connector-sqlserver-cdc依赖项关联的flink-connector-sqlserver-cdc.properties文件。
      • 确保配置中的columnMapping部分包含BOSS_UNDER_PROCESSING字段以及相应的转换器。例如:
        columnMapping = {BOSS_UNDER_PROCESSING=VARCHAR}
        
      • columnMapping中的字段名称替换为实际的字段名,确保它与数据库中的字段名称相同。
    4. 验证数据一致性

      • 使用print()方法打印数据流中的数据,以检查是否按照预期执行。
      • 检查BOSS_UNDER_PROCESSING字段的值是否已成功从数据库读取并写入流中。
    5. 处理缺失数据

      • 如果BOSS_UNDER_PROCESSING字段在数据库中确实存在但未被读取,可能需要手动填充缺失值。这一步取决于您如何处理缺失值策略。

    通过以上步骤,应该可以解决BOSS_UNDER_PROCESSING字段在CDC中的错误值。如果您遇到任何其他问题,或者有更具体的上下文细节,请提供更多信息以便进行进一步的帮助。

    评论 编辑记录

报告相同问题?

问题事件

  • 修改了问题 8月7日
  • 修改了问题 8月6日
  • 创建了问题 8月6日

悬赏问题

  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见
  • ¥15 一共有五道问题关于整数幂的运算还有房间号码 还有网络密码的解答?(语言-python)
  • ¥20 sentry如何捕获上传Android ndk 崩溃
  • ¥15 在做logistic回归模型限制性立方条图时候,不能出完整图的困难
  • ¥15 G0系列单片机HAL库中景园gc9307液晶驱动芯片无法使用硬件SPI+DMA驱动,如何解决?