spark,spark 性能优化

 2023-09-24 阅读 26 评论 0

摘要:一 性能优化点 # 提升并行度,就意味着有更多的分区,也就意味着有更多的task.当然不是越多越好,结合实际情况 spark,# 对多次使用的RDD进行缓存,可以减少不必要的计算 # 使用序列化的持久化机制,这样可以减少内存占用以及GC开销 # Java虚拟

一 性能优化点

# 提升并行度,就意味着有更多的分区,也就意味着有更多的task.当然不是越多越好,结合实际情况

spark,# 对多次使用的RDD进行缓存,可以减少不必要的计算

# 使用序列化的持久化机制,这样可以减少内存占用以及GC开销

# Java虚拟机调优

# 广播共享数据

# 数据本地化

# shuffle调优

# 使用高性能的序列化类库

 

二 诊断内存消耗

我们应该如何判断Spark程序耗费多少内存呢?

首先:你可以在SparkConf设置一个分区参数spark.default.parallelism可以统一设置这个所有应用程序的RDD分区数量参数;或者在调用parallelize或者textFile方法的时候手动指定分区数

其次:在程序中将RDD cache到内存之中

然后:观察driver的log,找到这样的信息:

INFO BlockManagerMasterActor: Added rdd_0_1 inmemory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)”的日志信息,这可就显示了每一个partition占用了多少内存

最后:将这个信息乘以partition数量就可以得到RDD的内存占用量

三 提高并行度

spark-submit里面设置2个Execuor,每一个Executor有5个CPU cores如果我们在SparkConf设置了spark.default.parallelism  = 5,那么所有RDD的partition都设置成5个,也就说每个RDD会被拆为5份儿,那么,针对RDD的partition,一个partition会启动task来计算,所以,针对所有的算子操作,都只会创建5个task在集群中运行。

 

所以,在这个时候,集群中明明有10个cpu cores,你就设置了5个

task,那么相当于集群中还有5个cores是空闲的,所以资源被白白

浪费了

 

其实最好的情况,就是说每个CPU core都不空闲,一直不停的运转

那是否我们设置10个task就是最好的呢?不一定,因为每一个task执行顺序不一样和完成的时间 不一样,那么如果正好10个,可能某个task完成的快,那么那个CPU 就又空闲呢,资源又浪费了

 

所以Spark官方推荐我们设置集群cpu数量的2-3倍并行度,这样话

每一个CPU可能被分配的task线程数就是2-3个,那么集群资源就不会出现空闲的情况。

 

另外,Spark会自动设置以文件作为输入源的RDD的并行度,依据其大小,比如HDFS,就会给每一个block创建一个partition,也依据这个设置并行度。对于reduceByKey等会发生shuffle的操作,就使用并行度最大的父RDD的并行度即可。

 

可以手动使用textFile()、parallelize()等方法的第二个参数来设置并行度;也可以使用spark.default.parallelism参数,来设置统一的并行度

 

比如说,spark-submit设置了executor数量是10个,每个executor要求分配2个core,那么application总共会有20个core。此时可以设置newSparkConf().set("spark.default.parallelism", "60")来设置合理的并行度,从而充分利用资源

 

四 对多次使用的RDD进行缓存,可以减少不必要的计算

如果程序中对于某一个RDD进行了多次transformation操作或者行为操作,那么就有必要对其进行持久化操作,以避免对一个RDD反复计算。

此外如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行checkpoint操作。

 

五 使用序列化的持久化级别

如果内存不是特别充足的情况下,可以使用MEMORY_ONLY_SER或者MEMORY_AND_DISK_SER对RDD持久化序列化之后,RDD的每一个Partition的数据都是序列化为一个巨大的字节数组,这样,对于内存的消耗就小多了,但是唯一缺点是获取RDD的数据需要反序列化,会增大其CPU性能开销

 

六 Java虚拟机优化

6.1 垃圾回收优化

如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为了瓶颈。因为JaVA虚拟机会定期进行垃圾回收,此时就会追踪所有的Java对象,并且在垃圾回收的时候,找到那些已经不再使用对象,然后清理这些对象,给新的对象腾出空间。

 

垃圾回收的性能开销是和内存中的对象数量成正比,所以对于垃圾回收性能问题,实现要使用高效的数据结构,比如数组和String;其次就是持久化的时候使用序列化的持久化级别,而且使用Kryo库来序列化,降低内存使用和GC开销,而且每一个partition就是一个对象

 

如何进行垃圾回收检测?

我们可以对垃圾回收进行监测,包括多久进行一次垃圾回收,以及每次垃圾回收耗费的时间。只要在spark-submit脚本中,增加一个配置即可,--conf "spark.executor.extraJavaOptions=-verbose:gc-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。

这里虽然会打印出Java虚拟机的垃圾回收的相关信息,但是是输出到了worker上的日志中,而不是driver的日志中。

但是这种方式也只是一种,其实也完全可以通过SparkUI(4040端口)来观察每个stage的垃圾回收的情况。

 

6.2 优化Eden区域,避免full gc

Java堆空间被划分成了两块空间,一个是年轻代,一个是老年代。年轻代放的是短时间存活的对象,老年代放的是长时间存活的对象。年轻代又被划分了三块空间,Eden、Survivor1、Survivor2。

首先,Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。创建的对象,首先放入Eden区域和Survivor1区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收。Eden和Survivor1区域中存活的对象,会被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。

如果一个对象,在年轻代中,撑过了多次垃圾回收,都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,进入老年代的问题。

如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作。

如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。此时可以执行一些操作来优化垃圾回收行为:

# 包括降低spark.storage.memoryFraction的比例,给年轻代更多的空间,来存放短时间存活的对象。因为这个spark.executor.memory决定了每个Executor可用内存的大小,而spark.storage.memoryFraction则决定了在这部分内存中有多少内存可以用于存储RDD的缓存数据,剩下的则是保证任务运行时各种其它内存空间的需要

 

# 给Eden区域分配更大的空间,使用-Xmn即可,通常建议给Eden区域,预计大小的4/3;

 

#如果使用的是HDFS文件,那么很好估计Eden区域大小,如果每个executor有4个task,然后每个hdfs压缩块解压缩后大小是3倍,此外每个hdfs块的大小是64M,那么Eden区域的预计大小就是:4 * 3 * 64MB,然后呢,再通过-Xmn参数,将Eden区域大小设置为4 * 3 * 64 * 4/3。

 

6.3优化executor内存比例

对于垃圾回收来说,最重要的就是调节RDD缓存占用的内存空间,与算子执行时创建的对象占用的内存空间的比例。默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么在task执行期间创建的对象,只有40%的内存空间来存放。

 

在这种情况下,很有可能因为你的内存空间的不足,task创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此在极端情况下,垃圾回收操作可能会被频繁触发。

在上述情况下,如果发现垃圾回收频繁发生。那么就需要对那个比例进行调优,使用new SparkConf().set("spark.storage.memoryFract

ion", "0.5")即可,可以将RDD缓存占用空间的比例降低,从而给更多的空间让task创建的对象进行使用。

 

降低spark.storage.memoryFraction的比例,给年轻代更多的空间,来存放短时间存活的对象。因为这个spark.executor.memory决定了每个executor可用内存的大小,而spark.storage.memoryFraction则决定了在这部分内存中有多少内存可以用于存储RDD的缓存数据,剩下的则是保证任务运行时各种其它内存空间的需要

 

七 广播共享数据

如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于将一个大数据拷贝到每一个task上去。而是给每个节点拷贝一份,然后节点上的task共享该数据。

这样的话,就可以减少大数据在节点上的内存消耗。并且可以减少数据到节点的网络传输消耗。

 

比如说,我们在处理的时候,可能会结合某个表中的数据然后进行一个join操作,我们可以先把表中的数据读取出来放在driver上,编程一个ArrayList,然后在算子函数中使用进行join操作

 

如果这个外部数据很大比如说100M,算子函数又要使用它,那么默认是会把每一份数据拷贝到每一个task中,如果某一个worker节点运行了10个task那就是会占用接近1G的内存,而且产生大量的网络传输

 

如果使用广播变量,我就只需要把这个数据拷贝到每一个节点一份就行,而不是每一个task一份副本,大大减少了内存的占用和网络传输

 

八reduceByKey & groupByKey

val counts = pairs.reduceByKey(_ + _)

val counts = pairs.groupByKey().map(wordCounts=> (wordCounts._1, wordCounts._2.sum))

如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。

只有在reduceByKey处理不了时,才用groupByKey().map()来替代。

 

 

九shuffle性能优化

9.1 spark 1.6 之前的版本

spark.shuffle.consolidateFiles:是否开启shuffle block file的合并,默认为false

 

减少了节点生成Shuffle BlockFile的文件数量,并且Result Task拉取数据的文件也减少了,所以减少了磁盘I/O的开销

 

spark.reducer.maxSizeInFlight:reduce task的拉取缓存,默认48m

 

Result Task每次只能从文件拉取指定缓存大小的数据量,拉取完之后进行聚合处理,然后再次拉取,如果内存足够大,可以稍微加大一些,拉取的次数就变少了

spark.shuffle.file.buffer:map task的写磁盘缓存,默认32k

Map端再将数据写入磁盘文件之前,会先把数据刷到叫做bucket缓存,然后达到一定数据量,再往ShuffleMapFile里面写,这样适当提高可以减少磁盘写的次数

 

spark.shuffle.io.maxRetries:拉取失败的最大重试次数,默认3次

spark.shuffle.io.retryWait:拉取失败的重试间隔,默认5s

 

Reduce Task拉取数据的时候可能会遇到Map Task的JVM正在FULL GC ,此时就会出现正常工作线程停止,那么可能等待一段时间后,FULL GC还没有完成,就导致本地文件没有拉取到

 

很有可能你的GC没有调优好,导致每次GC都一分钟,那么拉取最大时间是3 * 5s = 15s,就会导致频繁的很多文件拉取失败,会造成Shuffle Output File Lost,然后DAGScheduler会重试task 和 stage最后甚至可能导致应用程序挂掉

spark.shuffle.memoryFraction:用于reduce端聚合的内存比例,默认0.2,超过比例就会溢出到磁盘上

 

执行Reduce Task的Executor有一部分内存用来汇聚各个Reduce Task拉取的的数据,放入map进行聚合,这个参数就表示用于这部分聚合的内存比是多少。如果数据量太大,内存不够用就会溢写到磁盘。

 

9.2 spark1.6之后的版本

 

十 高性能序列化类库

在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。

Spark自身默认就会在一些地方对数据进行序列化,比如Shuffle。还有就是,如果我们的算子函数使用到了外部的数据(比如Java内置类型,或者自定义类型),那么也需要让其可序列化。

而Spark自身对于序列化的便捷性和性能进行了一个取舍和权衡。默认,Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制。因为这种方式是Java原生提供的,很方便使用。

但是问题是,Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,还是相对来说比较大,还是比较占用内存空间。因此,如果你的Spark应用程序对内存很敏感,那么,实际上默认的Java序列化机制并不是最好的选择。

 

Spark提供的两种序列化机制

1、Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Externalizable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。

2、Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。

如何使用Kryo序列化机制?

如果要使用Kryo序列化机制,首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer","org.apache.spark.serializer.KryoSerializer")即可,即将Spark的序列化器设置为KryoSerializer。这样,Spark在内部的一些操作,比如Shuffle,进行序列化时,就会使用Kryo类库进行高性能、快速、更低内存占用量的序列化了。

使用Kryo时,它要求是需要序列化的类,是要预先进行注册的,以获得最佳性能——如果不注册的话,那么Kryo必须时刻保存类型的全限定名,反而占用不少内存。Spark默认是对Scala中常用的类型自动注册了Kryo的,都在AllScalaRegistry类中。

但是,比如自己的算子中,使用了外部的自定义类型的对象,那么还是需要将其进行注册

如果要注册自定义的类型,那么就使用如下的代码,即可:

Scala版本:

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.registerKryoClasses(Array(classOf[Counter] ))

val sc = new SparkContext(conf)

Java版本:

SparkConf conf = new SparkConf().setMaster(...)

.setAppName(...)

conf.registerKryoClasses(Counter.class)

JavaSparkContext sc = new JavaSparkContext(conf)

 

优化Kryo类库的使用

1、优化缓存大小

如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设置spark.kryoserializer.buffer.mb参数的值,将其调大。

默认情况下它的值是2,就是说最大能缓存2M的对象,然后进行序列化。可以在必要时将其调大。比如设置为10。

2、预先注册自定义类型

虽然不注册自定义类型,Kryo类库也能正常工作,但是那样的话,对于它要序列化的每个对象,都会保存一份它的全限定类名。此时反而会耗费大量内存。因此通常都建议预先注册号要序列化的自定义的类。

 

在什么场景下使用Kryo序列化类库?

那么,这里针对的Kryo序列化类库的使用场景,就是算子函数使用到了外部的大数据的情况。比如说吧,我们在外部定义了一个封装了应用所有配置的对象,比如自定义了一个MyConfiguration对象,里面包含了100m的数据。然后,在算子函数里面,使用到了这个外部的大对象。

此时呢,如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致,序列化速度缓慢,并且序列化以后的数据还是比较大,比较占用内存空间。

因此,在这种情况下,比较适合,切换到Kryo序列化类库,来对外部的大对象进行序列化操作。一是,序列化速度会变快;二是,会减少序列化后的数据占用的内存空间。

 

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/1/93093.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息