zqqnancy 2019-10-29 15:39 采纳率: 0%
浏览 569
已结题

如何用pypsark实现多任务并发处理?

需求:Spark的一个非常常见的用例是并行运行许多作业。如同一时间的10张表同并行做处理,如何通过pyspark实现?

from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
import sys
import logging
import time
from multiprocessing.dummy import Pool as ThreadPool


logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                    level=logging.INFO)


def test(spark,a,b):
    spark.sql('use %s' %(a))
    spark.sql('select * FROM b limit 1' %(b)


if __name__ == '__main__':  
    spark = SparkSession.builder \
        .appName("datapre_dropd") \
        .config("hive.metastore.uris", "thrift://***:***")\
        .enableHiveSupport()\
        .getOrCreate()
    a="ss"
    tbs=['test1','test2'] 
    end1=time.time()

    #多线程方式:
    tpool = ThreadPool(len(tbs))  # 创建一个线程池
    for tb in tbs:
        tpool.apply_async(test,args=(spark,a,tb))
    end2=time.time()
    logging.info("并发任务数:%s,并发任务执行时间:%s" %(str(len(tbs)),str(end2-end1)))

    spark.stop()

client模式下可以正常提交任务,但不会实际执行。如下图,可以看到执行了第一句就跳出了。
图片说明

cluster模式下执行会报错:
图片说明

  • 写回答

1条回答 默认 最新

  • dabocaiqq 2019-10-29 23:27
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥65 汇编语言除法溢出问题