问题描述:SparkStream在使用mapWithState时,无论设置timeout为多久(30s,3min,4min。。。),过了该超时时间后,该批信令数据依然会发送,没有被移除,而是过了9分钟后才会移除,
感觉该时间范围没有起到作用。
业务场景:一批业务实时信令数据,如果在预置时间范围内(即该用户保持沉默)没有上报。则系统会认为该用户一直在当前位置(即会将该用户的信令重复上报),超过该时间范围则将其移除。
业务实现:通过使用mapWithState以及其timeout特性设置,将过期的缓存信令移除。
代码片段:如下
//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
);
//使用map先对InputDStream进行取值操作
JavaDStream<String> lines=kafkaStream.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
String line=consumerRecord.value();
return line;
}
});
lines.print();
// 只更新数值变更的数据
Function3<String, Optional<String>, State<String>,String> mappingFunc =
(line, preLine, state) -> {
String returnLine =state.exists() ? state.get() : line;
return returnLine;
}
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, String, String, String> stateDstream =
pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(3)));//3分钟后缓存的key过期
JavaPairDStream<String,String> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key
//遍历DStream,并转换成RDD
stateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
......
System.out.printlin("=====>"+line);
......
}