地球在逃人员 2025-07-11 15:50 采纳率: 0%
浏览 6

hadoop EMR集群spark离线任务优化

hadoop EMR集群spark离线任务优化
生产两张表dwd_dsp_bid_basic_log_d 表和dwd_dsp_cps_bid_log_d 表,各取7天分区数据,input_size在500gb左右,关联键是event_id+adv_id,两张表为ds,hh分区表,目前sql上已经做了优先分区剪枝获取7日数据,并使用distribute by event_id,adv_id 优化了关联键分布
spark任务配置参数如下,希望能控制任务使用资源上限,降低集群成本,但是该配置下执行效率很低,执行时间过长,有什么sql上或者spark上的优化点,能使得在这个配置上的spark任务执行能快一些?

 spark-sql --master yarn \
              --name "dwd_dsp_basic_and_cps_d" \
              --conf spark.shuffle.service.enabled=true \
              --conf spark.sql.adaptive.enabled=true \
              --conf spark.yarn.executor.lostCheckTimeout=60s \
              --conf spark.yarn.shuffle.stopOnFailure=true \
              --conf spark.excludeOnFailure.enabled=true \
              --conf spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor=2 \
              --conf spark.excludeOnFailure.timeout=10m \
              --conf spark.excludeOnFailure.defaultExecutorExcludeDuration=10m \
              --conf spark.sql.shuffle.partitions=1000 \
              --conf spark.executor.memory=3g \
              --conf spark.executor.cores=1 \
              --conf spark.shuffle.io.connectionTimeout=60s \
              --conf spark.shuffle.io.maxRetries=10 \
              --conf spark.shuffle.io.retryWait=30s \
              --conf spark.network.timeout=300s \
              --conf spark.dynamicAllocation.enabled=true \
              --conf spark.dynamicAllocation.executorIdleTimeout=120s \
              --conf spark.dynamicAllocation.maxExecutors=50 \
              --conf spark.dynamicAllocation.minExecutors=20 \
              --conf spark.executor.launchTimeout=120s \
              --conf spark.task.maxFailures=4 \
              --conf spark.executor.heartbeatInterval=60s \
              --conf spark.hadoop.yarn.timeline-service.enabled=true \
              --conf spark.executor.extraJavaOptions="
                    -Djava.net.preferIPv6Addresses=false
                    -verbose:gc
                    -XX:+PrintGCDetails
                    -XX:+PrintGCDateStamps
                    -XX:OnOutOfMemoryError='kill -9 %p'
                    -XX:+IgnoreUnrecognizedVMOptions
                    --add-opens=java.base/java.lang=ALL-UNNAMED
                    --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
                    --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
                    --add-opens=java.base/java.io=ALL-UNNAMED
                    --add-opens=java.base/java.net=ALL-UNNAMED
                    --add-opens=java.base/java.nio=ALL-UNNAMED
                    --add-opens=java.base/java.util=ALL-UNNAMED
                    --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
                    --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
                    --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
                    --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
                    --add-opens=java.base/sun.security.action=ALL-UNNAMED
                    --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
                    --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
                    -Djdk.reflect.useDirectMethodHandle=false
                    -XX:+UseG1GC
                    -XX:MaxGCPauseMillis=200
                    -XX:G1HeapRegionSize=8m
                    -XX:InitiatingHeapOccupancyPercent=45" \
              --hivevar seven_days_ago=${seven_days_ago} \
              --hivevar yesterday=${yesterday} \
              -f dwd_dsp_basic_and_cps_d.sql

```

  • 写回答

5条回答 默认 最新

  • 檀越@新空间 2025-07-11 15:58
    关注

    下午好🌅🌅🌅
    本答案参考通义千问

    在处理两张大宽表(每张约500GB,总input_size约1TB)的关联操作时,使用Spark进行优化是关键。以下是一些重点优化点和对应的解决方案,帮助提升Spark作业的执行效率。


    一、核心优化方向

    1. 调整Spark Shuffle参数

    • 问题:默认的spark.sql.shuffle.partitions=1000可能不够,尤其是在处理大数据量时。
    • 优化建议
      • 增加shuffle partitions,根据数据量和Executor数量合理设置,例如 spark.sql.shuffle.partitions=2000 或更高。
      • 确保分区数与Executor资源匹配,避免过多或过少的分区导致性能瓶颈。

    2. 优化Join策略

    • 问题:如果两张表的数据量较大且没有合适的分区键,直接使用JOIN可能导致Shuffle过大。
    • 优化建议
      • 使用 Broadcast Join 如果其中一张表较小(小于1GB),可以将小表广播到所有Executor。
      • 如果两张表都较大,使用 Sort Merge Join (SMJ) 并确保关联字段是排序的。
      • 确保关联字段有合适的索引(如Hive分区字段)。

    3. 优化Executor配置

    • 问题:当前配置为 --conf spark.executor.memory=3g --conf spark.executor.cores=1,单个Executor资源有限。
    • 优化建议
      • 增加每个Executor的内存和CPU核心,例如:
        --conf spark.executor.memory=6g \
        --conf spark.executor.cores=2 \
        
      • 同时调整动态分配参数:
        --conf spark.dynamicAllocation.maxExecutors=100 \
        --conf spark.dynamicAllocation.minExecutors=40 \
        

    4. 优化Hive表的读取方式

    • 问题:如果Hive表是分区表,但未按关联字段分区,会导致全表扫描。
    • 优化建议
      • 确保关联字段是Hive表的分区字段,以减少读取的数据量。
      • 在SQL中添加分区过滤条件,如:
        WHERE dt BETWEEN '2023-01-01' AND '2023-01-07'
        

    5. 启用Adaptive Query Execution (AQE)

    • 问题:虽然已经启用了 spark.sql.adaptive.enabled=true,但需要确认是否正确应用。
    • 优化建议
      • 确保 spark.sql.adaptive.enabled=true 已生效,并结合 spark.sql.adaptive.coalescePartitions.enabled=true 来合并小分区。

    二、具体优化步骤(有序列表)

    1. 增加Spark Executor资源

      • 修改Executor内存和CPU核心:
        --conf spark.executor.memory=6g \
        --conf spark.executor.cores=2 \
        
    2. 调整Spark Shuffle Partitions

      • 增加 spark.sql.shuffle.partitions 到合理值(如2000):
        --conf spark.sql.shuffle.partitions=2000 \
        
    3. 优化Join策略

      • 如果小表可广播,使用 BROADCAST JOIN
        SELECT /*+ BROADCAST(left_table) */
        ...
        FROM left_table
        JOIN right_table ON ...
        
      • 否则使用 SORT MERGE JOIN,并确保关联字段是排序的。
    4. 优化Hive表的分区策略

      • 确保关联字段是Hive表的分区字段,减少数据扫描量。
    5. 启用AQE并优化分区合并

      • 确保AQE已启用,并启用分区合并:
        --conf spark.sql.adaptive.enabled=true \
        --conf spark.sql.adaptive.coalescePartitions.enabled=true \
        
    6. 优化GC和JVM参数

      • 增强GC调优,避免频繁Full GC:
        --conf spark.executor.extraJavaOptions="
            -XX:+UseG1GC
            -XX:MaxGCPauseMillis=200
            -XX:G1HeapRegionSize=8m
            -XX:InitiatingHeapOccupancyPercent=45
            -XX:+PrintGCDetails
            -XX:+PrintGCDateStamps
            -XX:OnOutOfMemoryError='kill -9 %p'
        "
        
    7. 检查数据倾斜

      • 如果存在数据倾斜,可以考虑对关联字段进行 Salting 处理,或使用 salting 方法分散热点。

    三、修改后的Spark启动命令示例

    spark-sql --master yarn \
              --name "dwd_dsp_basic_and_cps_d" \
              --conf spark.shuffle.service.enabled=true \
              --conf spark.sql.adaptive.enabled=true \
              --conf spark.sql.adaptive.coalescePartitions.enabled=true \
              --conf spark.yarn.executor.lostCheckTimeout=60s \
              --conf spark.yarn.shuffle.stopOnFailure=true \
              --conf spark.excludeOnFailure.enabled=true \
              --conf spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor=2 \
              --conf spark.excludeOnFailure.timeout=10m \
              --conf spark.excludeOnFailure.defaultExecutorExcludeDuration=10m \
              --conf spark.sql.shuffle.partitions=2000 \
              --conf spark.executor.memory=6g \
              --conf spark.executor.cores=2 \
              --conf spark.shuffle.io.connectionTimeout=60s \
              --conf spark.shuffle.io.maxRetries=10 \
              --conf spark.shuffle.io.retryWait=30s \
              --conf spark.network.timeout=300s \
              --conf spark.dynamicAllocation.enabled=true \
              --conf spark.dynamicAllocation.executorIdleTimeout=120s \
              --conf spark.dynamicAllocation.maxExecutors=100 \
              --conf spark.dynamicAllocation.minExecutors=40 \
              --conf spark.executor.launchTimeout=120s \
              --conf spark.task.maxFailures=4 \
              --conf spark.executor.heartbeatInterval=60s \
              --conf spark.hadoop.yarn.timeline-service.enabled=true \
              --conf spark.executor.extraJavaOptions="
                  -Djava.net.preferIPv6Addresses=false
                  -verbose:gc
                  -XX:+PrintGCDetails
                  -XX:+PrintGCDateStamps
                  -XX:OnOutOfMemoryError='kill -9 %p'
                  -XX:+IgnoreUnrecognizedVMOptions
                  --add-opens=java.base/java.lang=ALL-UNNAMED
                  --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
                  --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
                  --add-opens=java.base/java.io=ALL-UNNAMED
                  --add-opens=java.base/java.net=ALL-UNNAMED
                  --add-opens=java.base/java.nio=ALL-UNNAMED
                  --add-opens=java.base/java.util=ALL-UNNAMED
                  --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
                  --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
                  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
                  --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
                  --add-opens=java.base/sun.security.action=ALL-UNNAMED
                  --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
                  --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED
                  -Djdk.reflect.useDirectMethodHandle=false
                  -XX:+UseG1GC
                  -XX:MaxGCPauseMillis=200
                  -XX:G1HeapRegionSize=8m
                  -XX:InitiatingHeapOccupancyPercent=45
              " \
              --hivevar seven_days_ago=${seven_days_ago} \
              --hivevar yesterday=${yesterday} \
              -f dwd_dsp_basic_and_cps_d.sql
    

    四、总结

    | 优化点 | 说明 | |--------|------| | Executor资源配置 | 提高内存和CPU,提升并行能力 | | Shuffle Partition 数量 | 根据数据量合理设置,避免数据倾斜 | | Join 策略选择 | 使用 Broadcast Join 或 Sort Merge Join | | Hive 分区优化 | 减少扫描数据量,提高查询效率 | | AQE 和分区合并 | 自动优化执行计划,减少Shuffle | | GC 和 JVM 调优 | 避免频繁GC,提升稳定性 |

    通过以上优化措施,可以显著提升Spark在处理大宽表关联任务时的性能表现。

    评论

报告相同问题?

问题事件

  • 修改了问题 7月11日
  • 修改了问题 7月11日
  • 创建了问题 7月11日