大家好,请问spark在standalone模式下没有配置hdfs文件系统,直接在每个节点的相同文件路径上放置数据拷贝,读取并处理数据的时候没有将数据划分到每个节点,而是只在master节点上处理数据,写数据也是这样,这是什么问题呢?
workers情况:
Worker Id ▴ Address State Cores Memory Resources
worker-20240427085249-172.18.0.2-46871 172.18.0.2:46871 ALIVE 12 (12 Used) 14.0 GiB (8.0 GiB Used)
worker-20240427131930-172.17.0.2-44649 172.17.0.2:44649 ALIVE 16 (0 Used) 5.6 GiB (0.0 B Used)
可以看见只在第一个节点工作
每个节点的文件夹路径:/opt/workspace/examples/poi,
内含文件
poi/1540884339286.csv
poi/1540882536996.csv
poi/1540883510706.csv等。
读取方式:df = sedona.read.option("encoding", "GBK").csv("/opt/workspace/examples/poi", header=True)
写入方式:df.write.mode('overwrite').csv('/opt/workspace/examples/poi_utf8.csv', header=True)
如果需要额外信息,请告诉我!
所有代码如下:
import os
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType
from shapely.geometry import Point
from shapely.geometry import Polygon
from sedona.spark import *
from sedona.core.geom.envelope import Envelope
config = SedonaContext.builder().master("spark://commie:7077").config('spark.jars.repositories', 'https://artifacts.unidata.ucar.edu/repository/unidata-all').getOrCreate()
sedona = SedonaContext.create(config)
sc = sedona.sparkContext
# sc.stop()
df = sedona.read.option("encoding", "GBK").csv("/opt/workspace/examples/poi", header=True)
df.show()
print(df.count())
test_mode = False
if test_mode: df = df.limit(1000000)
from pyspark.sql.functions import split, explode, concat, concat_ws, length
splited_location = split(df['location'], ",")
splited_type = split(df['type'], ";")
df = df.withColumn("longitude", splited_location.getItem(0))
df = df.withColumn("latitude", splited_location.getItem(1))
df = df.withColumn("level1", splited_type.getItem(0))
df = df.withColumn("level2", splited_type.getItem(1))
df = df.withColumn("level3", splited_type.getItem(2))
df.createOrReplaceTempView('table')
df = sedona.sql('''
select *,
ST_AsText(
ST_SetSrid(
ST_Point(
cast(longitude as decimal(9, 6)),
cast(latitude as decimal(9, 6))
),
4326
)
) as geom_text from table
''')
df = df.drop('longitude').drop('latitude').drop('location').drop('type')
df.show()
df.write.mode('overwrite').csv('/opt/workspace/examples/poi_utf8.csv', header=True)
merged_df = sedona.read.csv('/opt/workspace/examples/poi_utf8.csv', header=True)
merged_df.show()
sc.stop()
sedona.stop()
config.stop()