qq_38947155 2024-12-17 18:44 采纳率: 75%
浏览 6
已结题

flink 同步es异常

flink cdc 同步数据至es时,错误提升如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.enriched_orders'.

Table options are:

'connector'='elasticsearch-7'
'hosts'='http://192.168.3.65:9200'
'index'='enriched_orders'
    at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
    at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
    at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
    at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
    at com.ywhk.flink.MysqlPostgreTableJoinDemo.main(MysqlPostgreTableJoinDemo.java:38)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath.

Available factory identifiers are:

raw
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)

项目依赖es相关的依赖我引入了:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-elasticsearch7</artifactId>
            <version>3.0.1-1.17</version>
        </dependency>

建表sql如下:

CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   shipment_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://192.168.3.65:9200',
     'index' = 'enriched_orders'
 );

请问这是少依赖还是sql有问题?

  • 写回答

2条回答 默认 最新

  • 道友老李 JWE233286一种基于机器视觉的水表指针读数识别及修正的方法 专利发明者 2024-12-17 19:02
    关注
    让【道友老李】来帮你解答,本回答参考通义千问大模型编写,并整理提供,如果还有疑问可以点击头像关注,评论或留言
    根据你提供的错误信息和建表SQL,问题出在Flink无法找到合适的序列化格式工厂来处理你的数据。具体来说,错误信息指出找不到`json`标识符对应的`SerializationFormatFactory`实现。这通常意味着你需要明确指定用于Elasticsearch连接器的序列化格式,或者确保相关的依赖已经正确添加到项目中。

    问题分析

    1. 缺少序列化格式配置:从报错来看,Flink试图寻找一个名为json的序列化工厂但未能找到。对于Elasticsearch sink,需要指定如何将数据转换为JSON格式。
    2. 可能的依赖缺失或版本不匹配:尽管你提到已经添加了flink-sql-connector-elasticsearch7依赖,但仍有可能存在其他相关库的缺失或版本不兼容问题。

    解决方案

    明确指定序列化格式

    在创建Elasticsearch表时,通过设置format选项来指明使用何种序列化方式。对于大多数情况,使用debezium-json是一个合适的选择。

    修改后的建表语句如下:

    CREATE TABLE enriched_orders (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       product_name STRING,
       product_description STRING,
       shipment_id INT,
       origin STRING,
       destination STRING,
       is_arrived BOOLEAN,
       PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = 'http://192.168.3.65:9200',
        'index' = 'enriched_orders',
        'format' = 'debezium-json',  -- 添加这一行以指定序列化格式
        'sink.bulk.flush.max-size' = '42mb',  -- 可选配置项
        'sink.bulk.flush.max-actions' = '32',  -- 可选配置项
        'sink.bulk.flush.interval' = '5000'  -- 可选配置项
    );
    

    检查依赖

    确保所有必要的依赖都已包含在项目中,并且版本相互兼容。除了flink-sql-connector-elasticsearch7之外,还应检查是否有其他必需的库如flink-connector-base等。

    示例Maven依赖配置(基于Flink 1.17):

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-elasticsearch7_2.12</artifactId>
            <version>3.0.1-1.17</version>
        </dependency>
        <!-- 如果需要的话,加入更多依赖 -->
    </dependencies>
    

    小结

    通过明确指定序列化格式并确保所有必要依赖均已添加,可以解决上述异常。如果仍然遇到问题,请检查日志输出中的详细错误信息以及确认使用的Flink与Elasticsearch版本之间的兼容性。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 12月25日
  • 已采纳回答 12月17日
  • 创建了问题 12月17日