迪浮资讯Information Dynamic百分努力只为换取一份信任

 当前位置:首页 > 迪浮资讯 > 行业新闻

shuffle调优



什么情况下面会发生shuffle操作呢? 尽量使用高性能的算子(但是有要注意可能会发生OOM)


在Spark中,主要有以下几个算子

- groupByKey : 把分布在各个节点上的数据中的同一个key对应的value都集中到一块儿,集中到集群中的一个节点中,也即是集中到一个节点的executor的一个task中

- reduceByKey : 算子函数对values集合进行reduce操作,最后生成一个value

- countByKey : 在一个task中获取同一个key对应的所有value,然后计数,统计总共有多少value

- join : 两个RDD,Key相同的两个value都集中到一个executor的task中




Shuffle操作的过程

在一个shuffle过程中,前半部分stage中,每个task都会创建后半部分stage中相同task数量的文件,比如stage后半部分有100个task,那么前半部分的每个task都会创建100个文件(先写入到内存缓冲中,后溢满写入到磁盘),会将同一个key对应的values写入同一个文件中,shuffle后半部分的stage中的task,每个task都会从各个节点的task创建其中一份属于自己的那份文件中,拉取属于自己的key-value对,然后task会有一个内存缓冲区,然后调用HashMap进行key-values的聚合,最终调用我们定义的聚合函数来进行相应的操作。



Shuffle调优合并map端输出的文件
默认情况下spark是不开启map端输出文件的合并的机制的,当spark在分批次执行task的时候,task每次都会创建新的文件,而不会共享这些数据的,所以开启合并机制能够提升性能:

new SparkConf().set("spark.shuffle.consolidateFiles", "true")

设置合并机制之后:

第一个stage,并行运行2个task,运行这两个task时会创建下一个stage的文件,运行完之后,会运行下一批次的2个task,而这一批次的task则不会创建新的文件,会复用上一批次的task创建的文件

第二stage的task在拉取上一个stage创建的文件时就不会拉取那么多文件了,而是拉取少量文件,每个输出文件都可能包含了多个task给自己的map端输出。

shuffle调优之map端内存缓冲和reduce内存占比



默认情况下:

每个task的内存缓冲为32kb,reduce端内存占比为0.2(即默认executor内存中划分给reduce task的微20%)

所以在不调优的情况下,如果map端task处理的比较大,内存不足则溢满写入磁盘

比如:

每个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。

每个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。

同理,ruduce端也一样



何时调优?

通过Spark UI查看shuffle磁盘的read和write是不是很大,如果很大则应相应调优

如何调优?

spark.shuffle.file.buffer : 32kb -> 128kb

spark.shuffle.memoryFraction: 0.2 -> 0.3




spark算子调优
MapPartitions提升Map类操作性能

spark中,最基本的原则是每个task处理一个RDD的partition

如果是普通的Map,假如一个partition中有一万条数据,那么map中的function就要执行和计算一万次,但是使用MapPartitions操作之后,一个task只会执行一次function,function一次接收了所有partition数据,性能比较高




MapPartitions的缺点:

如果是普通的Map,一条一条处理数据,当出现内存不够的情况时,那么就可以将已经处理掉的数据从内存里面垃圾回收掉,所以普通map通常不会出现OOM情况

如果是MapPartitions,对于大量数据来说,如果一个partiton数据有一百万条,一次性传入function之后,可能导致内存不足,但是又没办法腾出空间,直接就导致了内存溢出,OOM

所以,当使用MapPartitons算子时,要估算每个partiton的数据能不能一下子缓存到分配给executor的内存中,如果可以,就是用该算子,对性能有显著提升

所以对于map和mapPartition的区别使用。看具体场景

GroupByKey和ReduceByKey的区别

也是从存在效率问题的。




Spark算子调优之filter过后使用coalesce减少分区数量
Spark程序经过filter之后会出现以下两种情况的:

导致每个partition里面的数据不一样的,有的很多,而有的数据很少,导致在 后面的操作会数据倾斜的。
由于partition的数据量减少,在后面的计算还是按照跟partition的task相同的数据量,这样就导致了资源的浪费。
12. Spark算子调优之foreachPartition优化写数据库性能
foreach是对每条数据进行处理的,task对partition中的每一条数据都会执行function操作,如果function中有写数据库的操作,那么有多少条数据就会创建和销毁多少个数据库连接,这对性能的影响很大



在生产环境中,通常都是使用foreachPartition来写数据库的,使用了该算子之后,对于用户自定义的function函数,就调用一次,一次传入一个partition的所有数据,这里只需创建一个数据库连接,然后向数据库发送一条sql语句外加多组参数即可,但是这个时候要配合数据库的批处理

同样,该算子在超大数据量面前同样会出现OOM情况




Spark算子调优之使用repartition解决SparkSQL低并行度性能问题
通常情况下,在上面第二条的并行度调优时,使用spark.default.parallelism来设置并行度,那么这个设置在什么地方有效,什么地方无效?

当程序中没有使用SparkSQL,那么整个sparkapplication的所有stage的并行度都是设置的这个参数,除非使用了coalesce算子缩减过partition.

当程序中使用了SparkSQL,那么SparkSQl的 stage的并行度无法设置,因为SparkSQL会默认的根据Hive表对应的hdfs文件的block,自动设置SparkSQL那个stage的并行度,所以这就导致出现了用少量task来处理复杂逻辑的情况, 这种情况下,需要使用repartition来设置SparkSQL的并行度,即对从Hive中读取出来的RDD,使用repartiton重新分区为预期的数量来设置并行度。




spark算子reduceBykey进行本地的聚合操作
在map端,给下个stage的每个task创建的输出文件中,写数据之前,会进行本地的combiner操作,也就是说,对每一个key,对应的values都会执行用户自定义的算子函数,比如+_,当进行了这个combiner操作之后,减少了数据量,也即是减少了磁盘IO,同时减少了网络传输,对性能有明显提升,所以,在实际的项目中,能用reduceByKey实现的就尽量用该算子实现。






迪浮科技课程服务承诺:


1、免费重修:
学员缺课或者学校效果不理想,可以免费重修,确保课程内容完全掌握。重修过程中绝对不收取任何费用!
2、单独辅导:
如果学生在学习过程中,因各种原因无法跟班级进度同步,公司安排专家讲师一对一辅导,手把手带你成为IT精英!
3、自由实验:
清默网络有先进完善的实验中心,全天开放,实验机时不限,不断提高动手操作能力!
4、考试辅导:
学员考试前先通过清默内部模拟考试,技术确定达到考试要求并提供考前辅导及考试技巧讲解。
如果模拟考试未能通过,专家讲师会给予建议和辅导,合格后再参加正式考试,确保学员能高分通过认证考试。
5、企业实战项目:

即通过先进的设备完全搭建和企业项目完全一致的网络环境,把学习内容融会贯通并在实际项目中加以应用,以达到学员迅速掌握实际技能并弥补经验不足的目的,让学员的学习内容不再纸上谈兵,理论与实战完全吻合


咨询老师:郭老师

咨询电话:15056089769

咨询Q Q :1027831018