第一片心意
2022-01-14 18:38
采纳率: 80%
浏览 8
已结题

关于#flinksql#的问题,如何解决?

  1. 背景
    自定义flink sql udaf,实现类似于hive sql 中 collect_list、collect_set的功能。
  2. 代码
    函数创建代码
     buildInFunctionMap.put("collect_list", "com.baishancloud.log.function.udaf.CollectList");
     buildInFunctionMap.put("collect_set", "com.baishancloud.log.function.udaf.CollectSet");


    for (Map.Entry<String, String> entry : buildInFunctionMap.entrySet()) {
        try {
            Class<? extends UserDefinedFunction> subclass = Class.forName(entry.getValue()).asSubclass(UserDefinedFunction.class);
            tEnv.createTemporarySystemFunction(entry.getKey(), subclass);
        } catch (Exception e) {
            throw new RuntimeException("内建自定义函数:" + entry.getValue() + "创建失败,具体错误信息如下:\n" + ExceptionUtils.stringifyException(e));
        }
    }

2.1 collect_list实现
该代码是正常运行的,可以调用。

package com.baishancloud.log.function.udaf;

import org.apache.flink.table.functions.AggregateFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * 将字段所有值拼接到一起,保留重复值。<br>
 * 默认拼接符号为英文逗号。<br>
 * 必须将字段强转为字符串。<br>
 * 第一个泛型表示聚合结果类型,也就是返回值类型,第二个泛型表示累加器类型
 *
 * @author ziqiang.wang
 * @date 2022/1/14 10:20
 */
public class CollectList extends AggregateFunction<String, List<String>> {

    /**
     * 最终结果拼接符号
     */
    private final String separator = ",";

    /**
     * 创建并初始化累加器
     * 累加器是计算中间结果的数据结构体,存储聚合数据值,直到计算最终聚合结果。
     *
     * @return 具有初始化值的累加器
     */
    @Override
    public List<String> createAccumulator() {
        return new ArrayList<>();
    }


    /**
     * 计算并返回最终结果
     */
    @Override
    public String getValue(List<String> accumulator) {
        if (!accumulator.isEmpty()) {
            StringBuilder builder = new StringBuilder();
            accumulator.forEach(s -> builder.append(s).append(separator));
            return builder.deleteCharAt(builder.length() - 1).toString();
        } else {
            return "";
        }
    }


    /**
     * 处理输入参数值,并且更新提供的累加器实例。
     * 这个方法可以被重载。
     * 自定义聚合函数必须有至少一个accumulate()方法。
     *
     * @param acc   包含当前聚合结果的累加器
     * @param value 用户输入参数
     */
    public void accumulate(List<String> acc, String value) {
        if (value != null) {
            acc.add(value);
        }
    }


    /**
     * 从累加器实例撤回输入值。
     * 当前设计假定该输入值是以前累加过的值。
     * 该方法可以被重载。
     * 在无界表上使用有界OVER聚合数据时,必须实现该方法。
     *
     * @param acc   包含当前聚合结果的累加器
     * @param value 用户输入参数
     */
    public void retract(List<String> acc, String value) {
        if (value != null) {
            acc.remove(value);
        }
    }

    /**
     * 将一组累加器实例聚合到一个累加器实例。
     * 在无界会话窗口、滑动窗口进行分组聚合,以及在有界分区聚合时必须实现该方法。
     * 除此之外,实现该方法对优化器是有帮助的。
     * 比如,两阶段聚合优化要求所有聚合函数支持“merge”方法
     *
     * @param acc 保存聚合结果的累加器。注意,它应该包含之前聚合的结果,因此,我们不能在聚合方法中替换或清理这个实例。
     * @param it  一组将被合并的累加器对应的迭代器
     */
    public void merge(List<String> acc, Iterable<List<String>> it) {
        for (List<String> list : it) {
            acc.addAll(list);
        }
    }

    /**
     * 重置累加器
     */
    public void resetAccumulator(List<String> acc) {
        acc.clear();
    }

}

2.2 collect_set实现

package com.baishancloud.log.function.udaf;

import org.apache.flink.table.functions.AggregateFunction;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

/**
 * 将字段所有值拼接到一起,去掉重复值。<br>
 * 默认拼接符号为英文逗号。<br>
 * 必须将字段强转为字符串。<br>
 * 第一个泛型表示聚合结果类型,也就是返回值类型,第二个泛型表示累加器类型
 *
 * @author ziqiang.wang
 * @date 2022/1/14 10:20
 */
public class CollectSet extends AggregateFunction<String, Set<String>> {

    /**
     * 最终结果拼接符号
     */
    private final String separator = ",";

    /**
     * 创建并初始化累加器
     * 累加器是计算中间结果的数据结构体,存储聚合数据值,直到计算最终聚合结果。
     *
     * @return 具有初始化值的累加器
     */
    @Override
    public Set<String> createAccumulator() {
        return new HashSet<>();
    }


    /**
     * 计算并返回最终结果
     */
    @Override
    public String getValue(Set<String> accumulator) {
        if (!accumulator.isEmpty()) {
            StringBuilder builder = new StringBuilder();
            accumulator.forEach(s -> builder.append(s).append(separator));
            return builder.deleteCharAt(builder.length() - 1).toString();
        } else {
            return "";
        }
    }


    /**
     * 处理输入参数值,并且更新提供的累加器实例。
     * 这个方法可以被重载。
     * 自定义聚合函数必须有至少一个accumulate()方法。
     *
     * @param acc   包含当前聚合结果的累加器
     * @param value 用户输入参数
     */
    public void accumulate(Set<String> acc, String value) {
        if (value != null) {
            acc.add(value);
        }
    }


    /**
     * 从累加器实例撤回输入值。
     * 当前设计假定该输入值是以前累加过的值。
     * 该方法可以被重载。
     * 在无界表上使用有界OVER聚合数据时,必须实现该方法。
     *
     * @param acc   包含当前聚合结果的累加器
     * @param value 用户输入参数
     */
    public void retract(Set<String> acc, String value) {
        if (value != null) {
            acc.remove(value);
        }
    }

    /**
     * 将一组累加器实例聚合到一个累加器实例。
     * 在无界会话窗口、滑动窗口进行分组聚合,以及在有界分区聚合时必须实现该方法。
     * 除此之外,实现该方法对优化器是有帮助的。
     * 比如,两阶段聚合优化要求所有聚合函数支持“merge”方法
     *
     * @param acc 保存聚合结果的累加器。注意,它应该包含之前聚合的结果,因此,我们不能在聚合方法中替换或清理这个实例。
     * @param it  一组将被合并的累加器对应的迭代器
     */
    public void merge(Set<String> acc, Iterable<Set<String>> it) {
        for (Set<String> set : it) {
            acc.addAll(set);
        }
    }

    /**
     * 重置累加器
     */
    public void resetAccumulator(Set<String> acc) {
        acc.clear();
    }

}

2.3 sql调用

SELECT
    col1,
    collect_list(cast(col2 as string)) as col2,
    collect_set(cast(col2 as string)) as col3
FROM
    (VALUES
        ('a', 1),
        ('a', 1),
        ('a', 2),
        ('b', 3),
        ('b', 4),
        ('c', 5)
    ) AS a(col1, col2)
GROUP BY col1
;
  1. 错误信息
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'collect_set'.
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    at com.baishancloud.log.core.FlinkApplication.executeSelect(FlinkApplication.java:244)
    at com.baishancloud.log.core.FlinkApplication.executeAllSql(FlinkApplication.java:141)
    at com.baishancloud.log.core.FlinkApplication.start(FlinkApplication.java:73)
    at com.baishancloud.log.SubmitClient.main(SubmitClient.java:30)
Caused by: org.apache.flink.table.api.ValidationException: An error occurred in the type inference logic of function 'collect_set'.
    at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
    at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
    at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
    at java.util.Optional.flatMap(Optional.java:241)
    at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
    at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
    at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
    ... 8 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a valid type inference for function class 'com.baishancloud.log.function.udaf.CollectSet'. Please check for implementation mistakes and/or provide a corresponding hint.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
    at org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
    at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
    ... 21 more
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to accumulator mapping.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:135)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:168)
    at org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
    ... 24 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a type inference from method:
public void com.baishancloud.log.function.udaf.CollectSet.accumulate(java.util.Set,java.lang.String)
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractAccumulatorMapping(FunctionMappingExtractor.java:124)
    ... 26 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'java.util.Set<java.lang.String>' in generic class 'org.apache.flink.table.functions.AggregateFunction' in class com.baishancloud.log.function.udaf.CollectSet. Please pass the required data type manually or allow RAW types.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
    at org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
    ... 27 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a data type from 'java.util.Set<java.lang.String>'. Interpreting it as a structured type was also not successful.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:291)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
    ... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Class 'java.util.Set' must not be abstract.
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
    at org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
    at org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass(ExtractionUtils.java:164)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractStructuredType(DataTypeExtractor.java:479)
    at org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:289)
    ... 35 more

Process finished with exit code 1
  1. 自己尝试
    尝试使用@DataTypeHint @FunctionHint ,但是没有解决问题。
    在CollectSet.class类中,还是使用CollectList.class中的实现,只不过在最后返回结果的时候,将list转化为set,来实现去重效果。

  2. 想要效果
    CollectSet.class类中继续使用set,不用list最数据的聚合,然后再转化为set,这样可以节省。

  • 写回答
  • 好问题 提建议
  • 追加酬金
  • 关注问题
  • 收藏
  • 邀请回答

相关推荐 更多相似问题