临炤 2020-07-29 17:20 采纳率: 0%
浏览 94

pyspark为什么调用类会导致lost task报错?

在学习pyspark时,运行以下代码(二次排序)出错:

from operator import gt
from pyspark import SparkContext, SparkConf


'''
实现思路:
1、按照Ordered和Serializable接口实现自定义排序的key
2、要将进行排序的文件加载进来生成<key,value>的RDD
3、使用sortByKey基于自定义的Key进行二次排序
4、去除掉排序的Key只保留排序的结果
'''


class SecondarySortKey():
    @staticmethod
    def __init__(self, k):
        self.column1 = k[0]
        self.column2 = k[1]

    def __gt__(self, other):
        if other.column1 == self.column1:
            return gt(self.column2, other.column2)
        else:
            return gt(self.column1, other.column1)


def main():
    conf = SparkConf().setAppName('saprk_sort').setMaster('spark://master:7077')
    sc = SparkContext(conf=conf)
    file = "/usr/hadoop/test/file4.txt"
    rdd1 = sc.textFile(file)
    rdd2 = rdd1.filter(lambda x: len(x.strip()) > 0)
    rdd3 = rdd2.map(lambda x: ((int(x.strip(" ")[0]), int(x.strip(" ")[1])), x))
    rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]), x[1]))
    rdd5 = rdd4.sortByKey(False)
    rdd6 = rdd5.map(lambda x:x[1])
    rdd6.foreach(print)


if __name__ == '__main__':
    main()

出现lost task的错误,经过三天的尝试,基本排除了配置问题(运行不含类的代码没有问题),集群问题,代码问题(将上述代码改为本地运行没有问题),查遍了很多办法,有点小绝望,不知道有大神可以解答嘛?

  • 写回答

1条回答 默认 最新

  • threenewbee 2020-07-29 17:35
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 有偿四位数,节约算法和扫描算法
  • ¥15 VUE项目怎么运行,系统打不开
  • ¥50 pointpillars等目标检测算法怎么融合注意力机制
  • ¥15 关于超局变量获取查询的问题
  • ¥20 Vs code Mac系统 PHP Debug调试环境配置
  • ¥60 大一项目课,微信小程序
  • ¥15 求视频摘要youtube和ovp数据集
  • ¥15 在启动roslaunch时出现如下问题
  • ¥15 汇编语言实现加减法计算器的功能
  • ¥20 关于多单片机模块化的一些问题