苏默曦 2024-06-18 17:32 采纳率: 50%
浏览 23

使用Flink AggregateFunction计算股票的平均值报错

使用Flink AggregateFunction计算股票的平均值
股票:

img

img

AggregateFunction计算

img


idea运行出现报错如下;

img

  • 写回答

2条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-06-18 17:32
    关注

    引自免费微信小程序:皆我百晓生

    从你提供的信息来看,主要的问题在于使用Flink的AggregateFunction时遇到了异常。异常信息显示为NullPointerException,这通常意味着在某个方法调用中,尝试操作了一个空对象。在Flink的上下文中,这可能发生在AggregateFunction的实现中,尤其是在createAccumulatoraccumulatemerge等方法中处理了空值。

    针对你的问题,首先检查AggregateFunction的实现,确保所有必要的方法都正确实现了,并且在这些方法中处理了所有可能的边界情况,尤其是对于可能为null的对象。以下是一个可能的AverageAggregate实现示例,用于计算每只股票的平均价格:

    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    
    public class AverageAggregate implements AggregateFunction<StockPrice, Tuple3<String, Double, Integer>, Tuple2<String, Double>> {
    
        @Override
        public Tuple3<String, Double, Integer> createAccumulator() {
            return new Tuple3<>("", 0.0, 0);
        }
    
        @Override
        public Tuple3<String, Double, Integer> add(StockPrice stockPrice, Tuple3<String, Double, Integer> accumulator) {
            if (accumulator.f0.equals("")) {
                accumulator.f0 = stockPrice.symbol;
            }
            accumulator.f1 += stockPrice.price;
            accumulator.f2++;
            return accumulator;
        }
    
        @Override
        public Tuple3<String, Double, Integer> merge(Tuple3<String, Double, Integer> a, Tuple3<String, Double, Integer> b) {
            a.f1 += b.f1;
            a.f2 += b.f2;
            return a;
        }
    
        @Override
        public Tuple2<String, Double> getResult(Tuple3<String, Double, Integer> accumulator) {
            return new Tuple2<>(accumulator.f0, accumulator.f1 / accumulator.f2);
        }
    
        @Override
        public TypeInformation<Tuple3<String, Double, Integer>> getAccumulatorType() {
            return Types.TUPLE(Types.STRING, Types.DOUBLE, Types.INT);
        }
    
        @Override
        public TypeInformation<Tuple2<String, Double>> getResultType() {
            return Types.TUPLE(Types.STRING, Types.DOUBLE);
        }
    }
    

    此外,从你提供的日志中,还看到了关于SLF4J的警告,这可能是因为你的项目中缺少相应的日志框架依赖。尽管这个警告与NullPointerException没有直接关系,但最好还是解决它,以避免未来的潜在问题。你可以在pom.xmlbuild.gradle中添加对应的SLF4J绑定依赖,例如logback-classiclog4j-over-slf4j

    最后,检查StockPrice类的构造函数中的错误,即:

    public static StockPrice of(String symbol, double price, Long ts, int volume) {
        return new StockPrice(symbol, price, ts, volume, mediaStatus:"); 
    }
    

    这里mediaStatus未被赋值,应该传递一个默认值或从其他地方获取其值,否则会抛出编译错误。

    确保所有的类和方法都被正确地导入和定义,且无语法错误。如果问题仍然存在,建议仔细检查数据源和数据流处理过程中的每一个步骤,确保数据的正确性和完整性。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 6月18日

悬赏问题

  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见
  • ¥15 一共有五道问题关于整数幂的运算还有房间号码 还有网络密码的解答?(语言-python)
  • ¥20 sentry如何捕获上传Android ndk 崩溃
  • ¥15 在做logistic回归模型限制性立方条图时候,不能出完整图的困难
  • ¥15 G0系列单片机HAL库中景园gc9307液晶驱动芯片无法使用硬件SPI+DMA驱动,如何解决?