使用Flink AggregateFunction计算股票的平均值
股票:
AggregateFunction计算
idea运行出现报错如下;
使用Flink AggregateFunction计算股票的平均值
股票:
AggregateFunction计算
引自免费微信小程序:皆我百晓生
从你提供的信息来看,主要的问题在于使用Flink的AggregateFunction
时遇到了异常。异常信息显示为NullPointerException
,这通常意味着在某个方法调用中,尝试操作了一个空对象。在Flink的上下文中,这可能发生在AggregateFunction
的实现中,尤其是在createAccumulator
、accumulate
或merge
等方法中处理了空值。
针对你的问题,首先检查AggregateFunction
的实现,确保所有必要的方法都正确实现了,并且在这些方法中处理了所有可能的边界情况,尤其是对于可能为null
的对象。以下是一个可能的AverageAggregate
实现示例,用于计算每只股票的平均价格:
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
public class AverageAggregate implements AggregateFunction<StockPrice, Tuple3<String, Double, Integer>, Tuple2<String, Double>> {
@Override
public Tuple3<String, Double, Integer> createAccumulator() {
return new Tuple3<>("", 0.0, 0);
}
@Override
public Tuple3<String, Double, Integer> add(StockPrice stockPrice, Tuple3<String, Double, Integer> accumulator) {
if (accumulator.f0.equals("")) {
accumulator.f0 = stockPrice.symbol;
}
accumulator.f1 += stockPrice.price;
accumulator.f2++;
return accumulator;
}
@Override
public Tuple3<String, Double, Integer> merge(Tuple3<String, Double, Integer> a, Tuple3<String, Double, Integer> b) {
a.f1 += b.f1;
a.f2 += b.f2;
return a;
}
@Override
public Tuple2<String, Double> getResult(Tuple3<String, Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0, accumulator.f1 / accumulator.f2);
}
@Override
public TypeInformation<Tuple3<String, Double, Integer>> getAccumulatorType() {
return Types.TUPLE(Types.STRING, Types.DOUBLE, Types.INT);
}
@Override
public TypeInformation<Tuple2<String, Double>> getResultType() {
return Types.TUPLE(Types.STRING, Types.DOUBLE);
}
}
此外,从你提供的日志中,还看到了关于SLF4J
的警告,这可能是因为你的项目中缺少相应的日志框架依赖。尽管这个警告与NullPointerException
没有直接关系,但最好还是解决它,以避免未来的潜在问题。你可以在pom.xml
或build.gradle
中添加对应的SLF4J
绑定依赖,例如logback-classic
或log4j-over-slf4j
。
最后,检查StockPrice
类的构造函数中的错误,即:
public static StockPrice of(String symbol, double price, Long ts, int volume) {
return new StockPrice(symbol, price, ts, volume, mediaStatus:");
}
这里mediaStatus
未被赋值,应该传递一个默认值或从其他地方获取其值,否则会抛出编译错误。
确保所有的类和方法都被正确地导入和定义,且无语法错误。如果问题仍然存在,建议仔细检查数据源和数据流处理过程中的每一个步骤,确保数据的正确性和完整性。