迪浮资讯Information Dynamic百分努力只为换取一份信任

 当前位置:首页 > 迪浮资讯 > 行业新闻

spark调优的相关总结


1、reduce task OOM?

         增大reduce端的聚合操作的内存比例

         增大executor memory 内存大小 --executor-memory

         减少reduce task每次拉取的数据量  设置spark.reducer.maxSizeInFlight参数



2、在shuffle阶段executor挂掉?

     分析原因:(1)map task阶段的所运行的executor的内存不足,导致executor挂掉了。导致executor里面的blockmanager挂掉,不能和connctionmanager建立连接,



2.executor并没有挂掉

      2.1 BlockManage之间的连接失败(map task所运行的executor正在GC)


3.reduce task向Driver中的MapOutputTracker获取shuffle file位置的时候出现了问题

下面有几个重要的参数的调节:


    默认值:32k

    参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中待缓冲写满之后,才会溢写到磁盘。

     调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。


            默认值:48m

            参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

            调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

            错误:reduce oom

            reduce task去map拉数据,reduce 一边拉数据一边聚合   reduce段有一块聚合内存(executor memory * 0.2)

            解决办法:1、增加reduce 聚合的内存的比例  设置spark.shuffle.memoryFraction

            2、 增加executor memory的大小  --executor-memory 5G



            3、减少reduce task每次拉取的数据量  设置spark.reducer.maxSizeInFlight  24m


            默认值:3

            参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。



            调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

            shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage

spark.shuffle.io.retryWait



    默认值:5s

    参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

    调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction



     默认值:0.2

     参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

     调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,

    以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

        spark.shuffle.manager



    默认值:sort

            参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

            调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

           

        spark.shuffle.sort.bypassMergeThreshold



            默认值:200

            参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

            调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。


            默认值:false

            参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

            调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

1、资源调优

在开发过程中,增加或者分配更多的资源对于任务的执行的效率是显而易见的。但是资源本身是受到限制的,那么我们该如何解决这个问题呢?

一般在开发的时候,我们的提交任务的脚本

Spark-submit.sh 内容如下:

#!/bin/bash

/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/bin/spark-submit \

--class com.hypers.sparkproject.spark.session.UserVisitSessionAnalyzeSpark \

--num-executors 3 \                        --配置executor的数量

--driver-memory  1024M \                   --配置driver的内存,影响不大

--executor-memory 2G \                 --配置每个executor的内存大小

--executor-cores 3 \ --Spark standalone and YARN only --配置每个executor的cpu核数

/usr/loacl/recommend-1.0-SNAPSHOT.jar \

分配多少合理呢?

第一种:Spark Standalone即Spark运行在自己的分布式框架时,需要知道每台机器能够使用的内存,CPU核数,假如每台机器能够使用4G内存和2个CPU核数,一共20台机器,那么就可以executor数量设置20,每个executor内存设置4G,每个executor设置2 CPU core

第二种: Yarn 当Spark运行在yarn上时,需要查看资源队列有多少资源,假如资源队列有500G内存,100个CPU core可用,那么就可以设置50个executor,每个executor内存设置10G,每个executor设置2个CPU core



2、设置并行度

设置task的并行度

保证并行度与你设置的资源相匹配,不至于会浪费资源。例如10个executor,每个内存是4GB, 2个CPU, 10个task, 总的cpu20个,可以并行运行20个task。现在假设你要让每个executor都运行一个task. 这样的话,其实每一个executor上面就有一个空闲的cpu, 导致并行度与设置的任务不相符合,就浪费掉了资源。


3、重构RDD以及RDD序列化

原则一:尽量去复用RDD

原则二:公共RDD进行持久化到内存或者磁盘上面,那么之后对于这个RDD的操作都是直接取的持久化的数据

原则三:持久化数据进行序列化的操作

原则四:持久化 + 双副本机制

为了数据的高可靠性,而且内存充足,可以使用双副本机制进行持久化 
持久化的双副本机制,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;持久化的每个数据单元,存储一份副本,放在其他节点上面;从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。



  1. 广播变量:将大变量广播出去而不是直接使用的。

为什么要用Broadcast


Broadcast的原理


广播变量,在driver上会有一份初始的副本,task在运行的时候,如果要使用广播变量中的数据,首先会在自己本地的Executor对应的BlockManager中尝试获取变量副本,并保存在本地的BlockManager中,此后这个Executor上的所有task,都会直接使用本地的BlockManager中的副本, Executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本,距离越近越好.

总而言之: 广播变量的好处不是每一个task一份变量副本,而是变成每个节点的executor才一份副本,这样的话就可以变量产生的副本大大减少。



4、Kryo序列化的使用

默认是使用了java的序列化机制ObjectInputStream/ObjectoutputStream,这种序列化的方式简单,并且便于处理。但是效率不高,而且在进行RDd的持久化操作的时候,内存占比相对比较大。那么利用kryo序列化的好处:

  1. 算子函数中使用的外部变量,在经过kryo序列化之后,会优化网络传输的性能,优化集群中内存的占用和消耗.
  2. 持久化RDD的时候,优化内存的占用和消耗
  3. 优化shuffle操作的网络传输的数据

设置kryo 并且要注册自动义的类

set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

.registerKryoClasses(new Class[]{CategorySortKey.class})


5、Fastutil库

fastutil 是扩展了java标准集合框架(Map,List,Set,HashMap,ArrayList,HashSet)的类库,提供了特殊类型的Map,Set,List和queue,fastutil能够提供更小的内存占用,更快的存取速度,fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件;

fastutil最新版本要求Java 7以及以上版本


Spark中fastutil应用场景:

1.如果算子函数使用了外部变量,那么可以用三步来优化:

a.使用Broadcast广播变量优化,

b. 使用Kryo序列化类库优化,提升性能和效率,

c.如果外部变量是某种比较大的集合,可以使用fastutil改写外部变量

2.在算子函数中,如果要创建比较大的Map.List等集合,可以考虑将这些集合类型使用fastutil类库重写。

Maven库:

<dependency>

    <groupId>fastutil</groupId>

    <artifactId>fastutil</artifactId>

    <version>5.0.9</version>

</dependency>



6、数据本地化等待时长

Spark在Driver上面对application的每一个stage的task进行分配之前,都会计算每个task分配的是RDd的那个partitoion的数据呢?

Spark的task分配算法优先会希望每个task正好分配到它要计算的数据所在的节点,这样就避免了网络间传输数据。

但是,task可能没有机会分配到它的数据所在的节点,因为可能计算资源和计算能力都满了,这种情况下,                                   Spark会等待一段时间,过了这个时间,才会选择一个比较差的本地化级别,比如将这个task分配到相邻的一个节点上,这个时候肯定发生网络传输,会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中获取数据,上述中的一段时间即为本地化等待时长。那么如何调整这个等待时间呢?

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好

NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFSblock块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输

NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分

RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输

ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差。

如果大多都是PROCESS_LOCAL,那就不用调节了

如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长

调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志

看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短

调节方法:

spark.locality.wait,默认是3s;可以调节为6s,10s

默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

new SparkConf()

  .set("spark.locality.wait", "10")

Young/New Generation

年轻代与Spark调优息息相关,所以这里单独拿出来讲解

所有新生成的对象首先都是放在年轻代中,年轻代的目标就是尽可能快速的收集掉那些生命周期短的对象




迪浮科技课程服务承诺:


1、免费重修:
学员缺课或者学校效果不理想,可以免费重修,确保课程内容完全掌握。重修过程中绝对不收取任何费用!
2、单独辅导:
如果学生在学习过程中,因各种原因无法跟班级进度同步,公司安排专家讲师一对一辅导,手把手带你成为IT精英!
3、自由实验:
清默网络有先进完善的实验中心,全天开放,实验机时不限,不断提高动手操作能力!
4、考试辅导:
学员考试前先通过清默内部模拟考试,技术确定达到考试要求并提供考前辅导及考试技巧讲解。
如果模拟考试未能通过,专家讲师会给予建议和辅导,合格后再参加正式考试,确保学员能高分通过认证考试。
5、企业实战项目:

即通过先进的设备完全搭建和企业项目完全一致的网络环境,把学习内容融会贯通并在实际项目中加以应用,以达到学员迅速掌握实际技能并弥补经验不足的目的,让学员的学习内容不再纸上谈兵,理论与实战完全吻合


咨询老师:郭老师

咨询电话:15056089769

咨询Q Q :1027831018