微毂 2022-01-24 15:37 采纳率: 0%
浏览 1772

使用flink SQL的内置函数报错No match found for function signature


import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala._
import org.apache.flink
import org.apache.flink.table.api.EnvironmentSettings

import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object KeyWordsCnt {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()


    val tableEnv = StreamTableEnvironment.create(env, settings)

tableEnv.executeSql(
      """
        |CREATE TABLE kafka_source(
        |     pubcode string,
        |     `extJson` string,
        |     ts TIMESTAMP(3) METADATA FROM 'timestamp'
        |    )
        |    WITH(
        |    'connector' = 'kafka',
        |    'topic' = 'wdm_xg;wdm_intl_doc',
        |    'properties.bootstrap.servers' = 'localhost:9092',
        |    'properties.group.id' = 'keywords',
        |    'scan.startup.mode' = 'latest-offset',
        |    'json.fail-on-missing-field' = 'false',
        |    'json.ignore-parse-errors' = 'true',
        |    'format' = 'json'
        |
        |    )
        |""".stripMargin
    )

    val resultTable = tableEnv.sqlQuery("select JSON_VALUE(extJson,'$.user_logic') from kafka_source")

    print(resultTable.explain())

    val resultStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream(resultTable)
    resultStream.print()

报错

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 41: No match found for function signature JSON_VALUE(<CHARACTER>, <CHARACTER>)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    at keywords.KeyWordsCnt$.main(KeyWordsCnt.scala:85)
    at keywords.KeyWordsCnt.main(KeyWordsCnt.scala)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 41: No match found for function signature JSON_VALUE(<CHARACTER>, <CHARACTER>)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
    at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4860)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1813)
    at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321)
    at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
    at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
    at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:420)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4060)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3346)
    at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974)
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
    ... 6 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature JSON_VALUE(<CHARACTER>, <CHARACTER>)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
    ... 28 more


  • 写回答

2条回答 默认 最新

  • 周幽王丶 2022-01-28 18:36
    关注

    blink里不是所有阿里文档上的函数都可以用,可以用get_json_object替换

    评论

报告相同问题?

问题事件

  • 创建了问题 1月24日

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)