首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

Spark任务OOM问题如何解决?

编程知识
2024年10月14日 08:54

大家好,我是 V 哥。在实际的业务场景中,Spark任务出现OOM(Out of Memory) 问题通常是由于任务处理的数据量过大、资源分配不合理或者代码存在性能瓶颈等原因造成的。针对不同的业务场景和原因,可以从以下几个方面进行优化和解决。

一、业务场景及可能的OOM原因分析

  1. 数据量过大

    • 业务场景:处理海量数据集(例如,数亿行日志数据或数十TB的数据集),任务执行过程中需要对数据进行大规模的聚合、排序、连接等操作。
    • OOM 原因:数据无法完全放入内存,导致溢出,尤其是在shufflejoin操作时,数据量暴增。
  2. 数据倾斜

    • 业务场景:处理的数据分布不均匀(如某个用户或产品的数据量过多),导致部分节点上出现计算或内存瓶颈。
    • OOM 原因:由于部分节点需要处理大量的数据,某些节点的任务会使用超出可用内存的资源,而其他节点的负载较轻。
  3. 不合理的资源分配

    • 业务场景:资源分配过低,导致单个任务分配到的内存、CPU等资源不足。
    • OOM 原因:Executor的内存设置太小,或者数据过度缓存,导致内存不足。
  4. 代码中存在缓存过多或内存使用不合理

    • 业务场景:频繁使用cache()persist(),或对数据结构进行不必要的操作,导致内存过度消耗。
    • OOM 原因:数据缓存没有及时释放,导致内存占用过多。

二、针对OOM问题的解决方案

1. 调整Executor的内存和CPU资源

通过合理的资源分配,确保每个Executor有足够的内存处理数据。

  1. 增加Executor的内存
    Spark 中的Executor负责在集群节点上执行任务,默认每个Executor的内存可能不足以处理大数据集。可以增加Executor的内存以缓解OOM问题。
   --executor-memory 8G

可以通过--executor-memory选项来设置每个Executor的内存。例如,将内存设置为8GB。如果数据量很大,可以根据情况设置更大的内存。

  1. 调整堆外内存
    Spark还使用了一部分堆外内存(off-heap memory)。如果涉及大量的堆外内存操作,可以通过以下配置增加堆外内存:
   --conf spark.memory.offHeap.enabled=true
   --conf spark.memory.offHeap.size=4G
  1. 调整Executor的CPU核心数
    为每个Executor分配更多的CPU核心,以加快任务的处理速度,防止长时间占用内存。
   --executor-cores 4

通过--executor-cores设置每个Executor使用的核心数。例如,可以将核心数设置为4,以提升并发计算能力。

2. 调整内存管理策略

Spark的内存管理策略主要涉及以下几个关键参数,它们的优化配置可以帮助减少OOM问题。

  1. 调整内存管理比例
    Spark 2.x 及以上版本采用统一的内存管理模型,可以通过调节以下参数优化内存使用:
   --conf spark.memory.fraction=0.8
   --conf spark.memory.storageFraction=0.5
  • spark.memory.fraction:该参数控制了存储与执行内存的总占比,默认是0.6,可以适当调高。
  • spark.memory.storageFraction:该参数决定了在memory.fraction的基础上,存储内存的占比。如果需要更多执行内存,可以适当减小该值。
  1. 减少缓存数据的存储占用
    • 及时清理缓存:对于不再需要的数据,及时调用unpersist()来清理缓存,释放内存。
   rdd.unpersist()
  • 调整缓存级别:在缓存时,使用StorageLevel.DISK_ONLYStorageLevel.MEMORY_AND_DISK,以减少内存占用。
   rdd.persist(StorageLevel.MEMORY_AND_DISK)

3. 数据切分与优化操作

Spark任务中的shufflejoingroupBy等操作通常会引起大量内存消耗,以下优化可以减轻这些操作带来的OOM风险。

  1. 调整分区数
    • 对于大规模数据操作如joinshuffle等,分区数的设置至关重要。如果分区数过少,可能会导致某些分区数据量过大,进而导致内存溢出。
   rdd.repartition(200)

或者在执行某些操作时,显式指定分区数:

   rdd.reduceByKey(_ + _, numPartitions = 200)
  • 通常的经验是将分区数量设置为比Executor数量高出数倍(例如,每个核心处理2-4个分区)。
  1. 避免过多的宽依赖
    宽依赖(如groupByKey)会在shuffle时造成内存的压力,特别是数据量较大时,应该尽量避免。可以通过替换为reduceByKey等具有预聚合功能的操作来减少内存消耗:
   rdd.reduceByKey(_ + _)
  1. 避免数据倾斜
    如果存在数据倾斜,部分节点处理大量数据,容易导致OOM。以下是常见的解决方法:

    • 随机键拆分:可以为数据加上随机前缀,以打散数据,避免部分节点数据量过大。
   rdd.map(x => ((x._1 + new Random().nextInt(10)), x._2))
  • 广播小表:在join操作中,如果一张表很小,可以使用广播变量,将小表广播到每个节点,减少数据传输和内存占用:
   val broadcastVar = sc.broadcast(smallTable)
   largeTable.mapPartitions { partition =>
     val small = broadcastVar.value
     partition.map(largeRow => ...)
   }

4. 调整Spark的并行度和Shuffle机制

Spark的shuffle操作(如groupByKeyjoin)会导致大量数据需要在不同的节点之间传输。如果并行度设置过低,容易导致某个节点处理的数据量过大,从而引发OOM。

  1. 增加并行度
   --conf spark.sql.shuffle.partitions=200

或者在代码中显式设置:

   spark.conf.set("spark.sql.shuffle.partitions", "200")
  • 默认情况下,spark.sql.shuffle.partitions的值可能偏小(例如200),根据数据规模适当调整该值可以减轻单个节点的负载。
  1. 调整Shuffle合并机制
    Spark 3.0引入了 Adaptive Query Execution (AQE),可以在执行时动态调整shuffle的分区数,避免某些分区数据量过大:
   --conf spark.sql.adaptive.enabled=true
   --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64M

AQE 可以根据任务的执行情况自动调整shuffle的分区数,从而避免OOM。

五、小结一下

Spark任务中的OOM问题常常由于数据量过大、数据倾斜、资源分配不合理等问题引起,针对不同的业务场景,可以采取以下措施进行优化:

  1. 合理分配内存和CPU:增加Executor的内存和CPU核心数,合理配置内存管理参数。
  2. 调整分区数和优化操作:通过调整分区数、减少宽依赖等方式减少内存占用。
  3. 处理数据倾斜:通过随机键拆分、广播小表等方法避免数据倾斜。
  4. 使用缓存优化内存:减少不必要的cache()persist()操作,并及时释放缓存数据。

好了,今天的内容就写到这里,这些优化方法结合使用,可以有效解决Spark任务中的OOM问题。关注威哥爱编程,码码通畅不掉发。

From:https://www.cnblogs.com/wgjava/p/18463484
本文地址: http://shuzixingkong.net/article/2468
0评论
提交 加载更多评论
其他文章 数据结构 - 栈
栈是一种特殊线性数据结构,操作遵循后进先出原则,可解决表达式求值等问题。栈分为顺序栈和链栈,各有特点。文章详细介绍了栈的定义、分类及实现方式,包括顺序栈和链栈的ADT定义及基本操作实现。
数据结构 - 栈 数据结构 - 栈 数据结构 - 栈
JavaScript原型链污染探讨
如果你想弄明白什么怎样才可以实现JavaScript的原型链污染,那么你首先需要弄清楚两个东西,那就是__proto__和prototype。 到底什么才是__proto__和prototype? 那我们先来看看比较官方的说法吧: __proto__:是每个对象的隐藏属性,指向创建该对象的构造函数的
JavaScript原型链污染探讨
(系列六).net8 全局异常捕获机制
说明 该文章是属于OverallAuth2.0系列文章,每周更新一篇该系列文章(从0到1完成系统开发)。 该系统文章,我会尽量说的非常详细,做到不管新手、老手都能看懂。 说明:OverallAuth2.0 是一个简单、易懂、功能强大的权限+可视化流程管理系统。 友情提醒:本篇文章是属于系列文章,看该
(系列六).net8 全局异常捕获机制 (系列六).net8 全局异常捕获机制 (系列六).net8 全局异常捕获机制
如何在kubernetes环境中共享GPU
随着人工智能和大模型的快速发展,云上GPU资源共享变得必要,因为它可以降低硬件成本,提升资源利用效率,并满足模型训练和推理对大规模并行计算的需求。 在kubernetes内置的资源调度功能中,GPU调度只能根据“核数”进行调度,但是深度学习等算法程序执行过程中,资源占用比较高的是显存,这样就形成了很
如何在kubernetes环境中共享GPU 如何在kubernetes环境中共享GPU
Nuxt3+PM2集群模式启动及勘误
起因 之前写过一篇 Nuxt3 的文章,Nuxt3 环境变量配置,用到了 PM2,但是里面的一些配置存在问题,最近有空又验证了一下,这里做一个勘误。 问题 PM2 的启动配置中有一项是exec_mode,默认是fork,另一个可选值是cluster,fork 是单进程模式,cluster 是多进程模
Nuxt3+PM2集群模式启动及勘误 Nuxt3+PM2集群模式启动及勘误 Nuxt3+PM2集群模式启动及勘误
Android 车载应用开发指南 - CAN Bus 协议详解
​ 在现代车载应用开发中,CAN(Controller Area Network)总线协议扮演着不可或缺的角色。作为一个汽车内部网络的标准协议,CAN Bus 已经成为了车载系统通信的基础。而在 Android 车载应用开发的过程中,理解并利用好 CAN Bus 协议是必不可少的。 那么,CAN B
Android 车载应用开发指南 - CAN Bus 协议详解 Android 车载应用开发指南 - CAN Bus 协议详解 Android 车载应用开发指南 - CAN Bus 协议详解
apisix~自定义文件上传代理插件~支持form-data文件和kv参数
参考文献 https://stackoverflow.com/questions/24535189/composing-multipart-form-data-with-a-different-content-type-on-each-parts-with-j https://www.reddit.
WiFi基础(六):天线基础知识
liwen01 2024.10.01 前言 麦克斯韦预言了电磁波的存在,赫兹通过实验证实了麦克斯韦的预言,马可尼基于无线电磁波的原理发明了无线电报系统,从此人类进入无线通信系统时代。 天线是通信系统中必不可少的组成部分,它的作用是将电信号转换为电磁波信号发射出去,也可以将接收到的电磁波信号转换为电信
WiFi基础(六):天线基础知识 WiFi基础(六):天线基础知识 WiFi基础(六):天线基础知识