还是昨天的Spark数据分析的问题,求代码,有帮助的可以再加C币

现在我有一组数据,第一列是入站口到出站口(OD),第二列是刷卡的卡号,第三列是出行总时间。
现在我想研究在相同的OD下,出行时长的分布,并从中筛选出出行时长异常的卡号,默认出行时长超过该OD最短出行时长2倍为异常。
图片说明

0

2个回答

 from __future__ import print_function

import sys

from pyspark.sql import SparkSession

def min(a,b):
    return a if a < b else b


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:  odcount <file>", file=sys.stderr)
        exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonODCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    lines = lines.filter( lambda line: len(line.strip()) > 0 )
    mintimes = lines.flatMap(lambda x: [x[1:-1]]) \
                  .map( lambda x: (x.split(',')[0], int(x.split(',')[2])) ) \
                  .reduceByKey(min)
    mintime_list = mintimes.collect()
    mintime_map = {}
    print("min time:")
    for (od, mintime) in mintime_list:
        mintime_map[od] = mintime
        print("%s: %i" % (od.encode('utf-8'), mintime))

    largelines = lines.flatMap(lambda x: [x[1:-1]]) \
                  .filter( lambda x: int(x.split(',')[2]) > 2 * mintime_map.get(x.split(',')[0]) )

    print("large time line:")
    for line in largelines.collect():
        print("%s" % line.encode('utf-8'))

    spark.stop()
1

先做一次运算得到每个OD的最短时间,存储到数据库之类。
第2次运算加载每个OD的最短时间,筛选大于该时间2倍的数据进行输出。

0
qq_39582456
qq_39582456 回复weixin_42072155: 我也知道逻辑是这样的,但是在写代码上出了问题= =
大约一年之前 回复
weixin_42072155
weixin_42072155 好像是这样
大约一年之前 回复
qq_39582456
qq_39582456 可不可以麻烦你写一段代码给我看看呢,我最近才开始学Spark,很多东西都不太懂
大约一年之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!