qq_35265056 2018-12-05 07:25 采纳率: 0%
浏览 788
已结题

可否让spark算子执行到某一步时,通过某些控制条件,让整个spark程序停止,或者在那一步保存结果到文件?

有这么一个需求:让rdd1执行map(或其他算子),当出现满足条件的情况时,控制整个spark程序停止,或是保存停止结果到文件(优先保存结果到文件),而不继续执行后续步骤,为了提高速度。

 //示例,大概就是这么一个意思:当出现值为5的行,停止程序或是保存结果
 val rdd2 = rdd1.map(x =>{
           if(x==5){
                   //整个spark程序停止,或是保存停止结果到文件
           }   
           ...........
     }
 )

rdd2.count()

有没有什么方法可以实现呢?

  • 写回答

2条回答 默认 最新

  • 月半杰 2018-12-05 08:51
    关注

    没怎么看明白这个问题,如果只是(通过某些控制条件,在某一步保存结果到文件)可以filter需要的条件,并将结果保存。

    添加了一个breakable块,看看这样能达到优化条件不

    import scala.collection.mutable.ArrayBuffer
    import scala.util.control.Breaks._
    
    val ab= new ArrayBuffer[String]()
    var pp=0
    rdd1.foreach(aa=>
    {
    if(aa.contains("条件")){
    pp+=1
    ab += aa
    breakable{
    if (pp == 2) {  
    println(ab)
       break    
    }}}
    })
    
    评论

报告相同问题?

悬赏问题

  • ¥15 用windows做服务的同志有吗
  • ¥60 求一个简单的网页(标签-安全|关键词-上传)
  • ¥35 lstm时间序列共享单车预测,loss值优化,参数优化算法
  • ¥15 Python中的request,如何使用ssr节点,通过代理requests网页。本人在泰国,需要用大陆ip才能玩网页游戏,合法合规。
  • ¥100 为什么这个恒流源电路不能恒流?
  • ¥15 有偿求跨组件数据流路径图
  • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
  • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
  • ¥15 一直显示正在等待HID—ISP
  • ¥15 Python turtle 画图