梦在NASA 2020-03-25 23:44 采纳率: 0%
浏览 303

SparkStream通过mapWithState设置timeOut无效

问题描述:SparkStream在使用mapWithState时,无论设置timeout为多久(30s,3min,4min。。。),过了该超时时间后,该批信令数据依然会发送,没有被移除,而是过了9分钟后才会移除,
感觉该时间范围没有起到作用。

业务场景:一批业务实时信令数据,如果在预置时间范围内(即该用户保持沉默)没有上报。则系统会认为该用户一直在当前位置(即会将该用户的信令重复上报),超过该时间范围则将其移除。

业务实现:通过使用mapWithState以及其timeout特性设置,将过期的缓存信令移除。

代码片段:如下

//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定   
        JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
        );
        //使用map先对InputDStream进行取值操作            
        JavaDStream<String> lines=kafkaStream.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                        String line=consumerRecord.value();                            
                        return line;
                    }
        });
        lines.print();          


        // 只更新数值变更的数据
        Function3<String, Optional<String>, State<String>,String> mappingFunc =
                (line, preLine, state) -> {                       
                String returnLine =state.exists() ? state.get() : line;
                return returnLine;
                }

         // DStream made of get cumulative counts that get updated in every batch
        JavaMapWithStateDStream<String, String, String, String> stateDstream =
                pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(3)));//3分钟后缓存的key过期
        JavaPairDStream<String,String> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key


        //遍历DStream,并转换成RDD
            stateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
               ......
                System.out.printlin("=====>"+line);
               ......
            }   

  • 写回答

1条回答 默认 最新

  • 叫我阿柒啊 2021-04-12 22:58
    关注

    写了一篇文章,关于SparkStreaming使用mapWithState时,设置timeout()无法生效问题,解决方案和原因过程分析:

    https://blog.csdn.net/CatchLight/article/details/115621071

    希望对你有所帮助

    评论

报告相同问题?

悬赏问题

  • ¥15 ansys fluent计算闪退
  • ¥15 有关wireshark抓包的问题
  • ¥15 需要写计算过程,不要写代码,求解答,数据都在图上
  • ¥15 向数据表用newid方式插入GUID问题
  • ¥15 multisim电路设计
  • ¥20 用keil,写代码解决两个问题,用库函数
  • ¥50 ID中开关量采样信号通道、以及程序流程的设计
  • ¥15 U-Mamba/nnunetv2固定随机数种子
  • ¥15 vba使用jmail发送邮件正文里面怎么加图片
  • ¥15 vb6.0如何向数据库中添加自动生成的字段数据。