现在我有一组数据,第一列是入站口到出站口(OD),第二列是刷卡的卡号,第三列是出行总时间。
现在我想研究在相同的OD下,出行时长的分布,并从中筛选出出行时长异常的卡号,默认出行时长超过该OD最短出行时长2倍为异常。
还是昨天的Spark数据分析的问题,求代码,有帮助的可以再加C币
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
2条回答 默认 最新
- 默默悟问 2018-04-27 07:18关注
from __future__ import print_function import sys from pyspark.sql import SparkSession def min(a,b): return a if a < b else b if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: odcount <file>", file=sys.stderr) exit(-1) spark = SparkSession\ .builder\ .appName("PythonODCount")\ .getOrCreate() lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) lines = lines.filter( lambda line: len(line.strip()) > 0 ) mintimes = lines.flatMap(lambda x: [x[1:-1]]) \ .map( lambda x: (x.split(',')[0], int(x.split(',')[2])) ) \ .reduceByKey(min) mintime_list = mintimes.collect() mintime_map = {} print("min time:") for (od, mintime) in mintime_list: mintime_map[od] = mintime print("%s: %i" % (od.encode('utf-8'), mintime)) largelines = lines.flatMap(lambda x: [x[1:-1]]) \ .filter( lambda x: int(x.split(',')[2]) > 2 * mintime_map.get(x.split(',')[0]) ) print("large time line:") for line in largelines.collect(): print("%s" % line.encode('utf-8')) spark.stop()
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报
悬赏问题
- ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
- ¥50 有数据,怎么建立模型求影响全要素生产率的因素
- ¥50 有数据,怎么用matlab求全要素生产率
- ¥15 TI的insta-spin例程
- ¥15 完成下列问题完成下列问题
- ¥15 C#算法问题, 不知道怎么处理这个数据的转换
- ¥15 YoloV5 第三方库的版本对照问题
- ¥15 请完成下列相关问题!
- ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
- ¥15 求daily translation(DT)偏差订正方法的代码