梦在NASA 2020-03-25 23:44 采纳率: 0%
浏览 303

SparkStream通过mapWithState设置timeOut无效

问题描述:SparkStream在使用mapWithState时,无论设置timeout为多久(30s,3min,4min。。。),过了该超时时间后,该批信令数据依然会发送,没有被移除,而是过了9分钟后才会移除,
感觉该时间范围没有起到作用。

业务场景:一批业务实时信令数据,如果在预置时间范围内(即该用户保持沉默)没有上报。则系统会认为该用户一直在当前位置(即会将该用户的信令重复上报),超过该时间范围则将其移除。

业务实现:通过使用mapWithState以及其timeout特性设置,将过期的缓存信令移除。

代码片段:如下

//通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定   
        JavaInputDStream<ConsumerRecord<String,String>> kafkaStream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
        );
        //使用map先对InputDStream进行取值操作            
        JavaDStream<String> lines=kafkaStream.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> consumerRecord) throws Exception {
                        String line=consumerRecord.value();                            
                        return line;
                    }
        });
        lines.print();          


        // 只更新数值变更的数据
        Function3<String, Optional<String>, State<String>,String> mappingFunc =
                (line, preLine, state) -> {                       
                String returnLine =state.exists() ? state.get() : line;
                return returnLine;
                }

         // DStream made of get cumulative counts that get updated in every batch
        JavaMapWithStateDStream<String, String, String, String> stateDstream =
                pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(3)));//3分钟后缓存的key过期
        JavaPairDStream<String,String> fullStateDstream=stateDstream.stateSnapshots();//获取所有的未过期的key


        //遍历DStream,并转换成RDD
            stateDstream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>() {
               ......
                System.out.printlin("=====>"+line);
               ......
            }   

  • 写回答

1条回答 默认 最新

  • 叫我阿柒啊 2021-04-12 22:58
    关注

    写了一篇文章,关于SparkStreaming使用mapWithState时,设置timeout()无法生效问题,解决方案和原因过程分析:

    https://blog.csdn.net/CatchLight/article/details/115621071

    希望对你有所帮助

    评论

报告相同问题?

悬赏问题

  • ¥15 如何让企业微信机器人实现消息汇总整合
  • ¥50 关于#ui#的问题:做yolov8的ui界面出现的问题
  • ¥15 如何用Python爬取各高校教师公开的教育和工作经历
  • ¥15 TLE9879QXA40 电机驱动
  • ¥20 对于工程问题的非线性数学模型进行线性化
  • ¥15 Mirare PLUS 进行密钥认证?(详解)
  • ¥15 物体双站RCS和其组成阵列后的双站RCS关系验证
  • ¥20 想用ollama做一个自己的AI数据库
  • ¥15 关于qualoth编辑及缝合服装领子的问题解决方案探寻
  • ¥15 请问怎么才能复现这样的图呀