夏午Sharve 2024-04-27 21:26 采纳率: 57.1%
浏览 9

spark standalone 无 hdfs 文件读取问题

大家好,请问spark在standalone模式下没有配置hdfs文件系统,直接在每个节点的相同文件路径上放置数据拷贝,读取并处理数据的时候没有将数据划分到每个节点,而是只在master节点上处理数据,写数据也是这样,这是什么问题呢?

workers情况:
Worker Id ▴ Address State Cores Memory Resources
worker-20240427085249-172.18.0.2-46871 172.18.0.2:46871 ALIVE 12 (12 Used) 14.0 GiB (8.0 GiB Used)
worker-20240427131930-172.17.0.2-44649 172.17.0.2:44649 ALIVE 16 (0 Used) 5.6 GiB (0.0 B Used)
可以看见只在第一个节点工作

每个节点的文件夹路径:/opt/workspace/examples/poi,
内含文件
poi/1540884339286.csv
poi/1540882536996.csv
poi/1540883510706.csv等。

读取方式:df = sedona.read.option("encoding", "GBK").csv("/opt/workspace/examples/poi", header=True)
写入方式:df.write.mode('overwrite').csv('/opt/workspace/examples/poi_utf8.csv', header=True)

如果需要额外信息,请告诉我!

所有代码如下:

import os
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from shapely.geometry import Point
from shapely.geometry import Polygon

from sedona.spark import *
from sedona.core.geom.envelope import Envelope
config = SedonaContext.builder().master("spark://commie:7077").config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all').getOrCreate()

sedona = SedonaContext.create(config)
sc = sedona.sparkContext
# sc.stop()
df = sedona.read.option("encoding", "GBK").csv("/opt/workspace/examples/poi", header=True)
df.show()
print(df.count())
test_mode = False
if test_mode: df = df.limit(1000000)
from pyspark.sql.functions import split, explode, concat, concat_ws, length
splited_location = split(df['location'], ",")
splited_type = split(df['type'], ";")

df = df.withColumn("longitude", splited_location.getItem(0))
df = df.withColumn("latitude", splited_location.getItem(1))
df = df.withColumn("level1", splited_type.getItem(0))
df = df.withColumn("level2", splited_type.getItem(1))
df = df.withColumn("level3", splited_type.getItem(2))

df.createOrReplaceTempView('table')
df = sedona.sql('''
    select *,
    ST_AsText(
        ST_SetSrid(
            ST_Point(
                cast(longitude as decimal(9, 6)),
                cast(latitude as decimal(9, 6))
            ),
            4326
        )
    ) as geom_text from table
''')
df = df.drop('longitude').drop('latitude').drop('location').drop('type')

df.show()
df.write.mode('overwrite').csv('/opt/workspace/examples/poi_utf8.csv', header=True)
merged_df = sedona.read.csv('/opt/workspace/examples/poi_utf8.csv', header=True)
merged_df.show()
sc.stop()
sedona.stop()
config.stop()
  • 写回答

4条回答 默认 最新

  • 专家-赤兔[在线] 优质创作者: 编程框架技术领域 2024-04-27 21:26
    关注

    引自免费微信小程序:皆我百晓生

    看起来您在Spark Standalone模式下遇到了数据倾斜的问题,导致所有的任务都在master节点(worker-20240427085249-172.18.0.2-46871)上执行。为了解决这个问题,我们可以尝试以下步骤:

    1. 确保Spark worker节点都能看到数据文件。
    2. 使用Spark的repartition()函数重新分区数据,以确保数据均匀分布到各个worker节点。

    首先,确认您的数据文件已经复制到了所有worker节点的相同目录下。如果没有,您可以使用scp命令或者其他手段将数据文件同步到所有worker节点的 /opt/workspace/examples/poi 目录下。

    然后,在读取CSV数据后,对DataFrame进行重新分区操作。这可以通过调用repartition()函数实现,传入一个适当的分区数。根据您的worker节点情况,建议设置分区数与worker节点数相同,或者稍微多一些,例如spark.default.parallelism的值。这里我们先尝试使用spark.default.parallelism作为分区数。

    以下是修改后的代码片段,包含数据读取、重新分区以及写入的调整:

    import os
    from pyspark.sql import SparkSession
    from pyspark import StorageLevel
    import geopandas as gpd
    import pandas as pd
    from pyspark.sql.types import StructType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StringType
    from pyspark.sql.types import LongType
    from shapely.geometry import Point
    from shapely.geometry import Polygon
    
    from sedona.spark import *
    from sedona.core.geom.envelope import Envelope
    config = SedonaContext.builder().master("spark://commie:7077").config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all').getOrCreate()
    
    sedona = SedonaContext.create(config)
    sc = sedona.sparkContext
    
    # 获取spark默认并行度
    default_parallelism = sc.defaultParallelism
    
    # 读取CSV数据并重新分区
    df = sedona.read.option("encoding", "GBK").csv("/opt/workspace/examples/poi", header=True).repartition(default_parallelism)
    
    # ...后续代码保持不变...
    
    df.write.mode('overwrite').csv('/opt/workspace/examples/poi_utf8.csv', header=True)
    merged_df = sedona.read.csv('/opt/workspace/examples/poi_utf8.csv', header=True)
    merged_df.show()
    sc.stop()
    sedona.stop()
    config.stop()
    

    这段代码首先获取了SparkContext的默认并行度,并将其用于repartition()函数。这样,数据将会被均匀地分配到所有可用的worker节点上,从而避免数据倾斜的问题。

    请确保所有worker节点上的数据文件一致,并运行上述代码,观察是否解决了数据只在单个节点处理的问题。如果问题依然存在,可能需要进一步检查网络连接、权限设置或者Spark配置。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 4月27日