qq_39582456 2018-04-26 01:37 采纳率: 50%
浏览 656
已采纳

还是昨天的Spark数据分析的问题,求代码,有帮助的可以再加C币

现在我有一组数据,第一列是入站口到出站口(OD),第二列是刷卡的卡号,第三列是出行总时间。
现在我想研究在相同的OD下,出行时长的分布,并从中筛选出出行时长异常的卡号,默认出行时长超过该OD最短出行时长2倍为异常。
图片说明

  • 写回答

2条回答

  • 默默悟问 2018-04-27 07:18
    关注
     from __future__ import print_function
    
    import sys
    
    from pyspark.sql import SparkSession
    
    def min(a,b):
        return a if a < b else b
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("Usage:  odcount <file>", file=sys.stderr)
            exit(-1)
    
        spark = SparkSession\
            .builder\
            .appName("PythonODCount")\
            .getOrCreate()
    
        lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
        lines = lines.filter( lambda line: len(line.strip()) > 0 )
        mintimes = lines.flatMap(lambda x: [x[1:-1]]) \
                      .map( lambda x: (x.split(',')[0], int(x.split(',')[2])) ) \
                      .reduceByKey(min)
        mintime_list = mintimes.collect()
        mintime_map = {}
        print("min time:")
        for (od, mintime) in mintime_list:
            mintime_map[od] = mintime
            print("%s: %i" % (od.encode('utf-8'), mintime))
    
        largelines = lines.flatMap(lambda x: [x[1:-1]]) \
                      .filter( lambda x: int(x.split(',')[2]) > 2 * mintime_map.get(x.split(',')[0]) )
    
        print("large time line:")
        for line in largelines.collect():
            print("%s" % line.encode('utf-8'))
    
        spark.stop()
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 乘性高斯噪声在深度学习网络中的应用
  • ¥15 运筹学排序问题中的在线排序
  • ¥15 关于docker部署flink集成hadoop的yarn,请教个问题 flink启动yarn-session.sh连不上hadoop,这个整了好几天一直不行,求帮忙看一下怎么解决
  • ¥30 求一段fortran代码用IVF编译运行的结果
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥30 python代码,帮调试,帮帮忙吧