1.优化Spark
由于大多数Spark计算的内存本质,Spark程序可能因为集群中的任何资源造成瓶颈:CPU,网络,带宽,或者内存。大多数情况下,如果数据可以容纳在内存中,性能瓶颈就是网络带宽,但是有时,你还是需要做一些调优,比如用序列化形式存储RDDs来减少内存使用。这篇指南会覆盖两个主题:数据序列化,这对良好的网络性能是非常关键的,而且也可以减少内存使用;内存调优。我们也讨论了一些小的主题。
2.数据序列化
序列化在分布式应用的性能中扮演了一个非常重要的角色。那些序列化对象速度很慢的格式,或者消耗大量字节的格式,会极大的降低性能。通常来说,这应该是你调优一个spark应用时要做的第一件事。spark旨在便捷性(允许你在操作中使用任何java类型)和性能之间取得一个平衡。它提供了两种序列化类库:
· Java序列化:默认情况下,spark使用java的ObjectOutputStream框架来序列化对象,并且对你创建的任何实现了java.io.Serialiable接口的类都有效。你可以控制你的序列化机制的性能,只要实现java.io.Externalizable即可。java序列化机制是灵活的,但是通常是很慢的,对很多类来说,它会导致非常大的序列化格式。
· Kryo序列化:Spark也可以使用Kryo类库来更快地序列化对象。Kryo极大地加快了序列化速度,并且比java序列化格式更加紧凑(通常可以达到10倍)。但是不支持所有的序列化类性,而且要求你自己预先注册在程序中使用的自定义类,来获得最佳的性能。
你可以切换为使用Kryo,只要在使用SparkConf初始化你的应用时,调用conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个配置设置了序列化器,不仅仅是对于在worker node之间shuffle数据,而且也对于将RDDs序列化到磁盘上。Kryo不是默认序列化器的唯一原因是因为它的自定义类注册要求,但是我们推荐在任何网络资源紧张的应用中使用Kryo。
Spark对许多常用的核心Scala类都自动包含了Kryo序列化器。
要注册你自己的自定义类到Kryo上,使用registerKryoClass方法。
valconf
=
new
SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1],
classOf[MyClass2]))
valsc
=
new
SparkContext(conf)
Kryo文档描述了更多高级的注册选项,比如增加自定义序列化代码。
如果你的对象很大,你也许需要增加spark.kryoserializer.buffer.mb属性的值。默认是2MB,但是这个值需要足够大到保存你最大的对象。
最后,如果你不注册你的自定义类,Kryo还是会工作,但是它就必须存储每个对象的全限定类名,很浪费内存。
3.内存调优
在优化内存使用的时候有三个考虑因素:你的对象使用的内存总量(你可能希望你完整的数据集都保存在内存中),访问这些对象的成本,垃圾回收的开销(如果你有大量对象的处理)。
默认情况下,访问Java对象是非常快的,但是会比它们的field中的原始数据多消耗2-5倍的空间。这是因为多种原因:
· 每个java对象都有一个对象头,大约是16个字节,包含了诸如指向它的类的引用这类信息。对于那些只有很少的数据的对象(比如只有一个int field),对象头会比对象本身的数据要大很多。
· java字符串比原始字符串数据要多出40个字节的开销(因为它将数据存储在一个char数组中,并且还保存了额外的信息,比如字符串的长度),而且因为字符串内部使用了UTF-16编码,所以会使用2个字节存储每个字符。因此一个包含10个字符的字符串可以轻易消耗掉40个字节。
· 普通的集合类,比如HashMap和LinkedList,使用了链式数据结构,对每一个entry对象都有一个包装对象(比如Map.Entry)。这个对象不仅包含对象头,而且还包含了指向列表中下一个对象的引用(通常来说是8个字节)。
· 原始类型的集合通常将原始数据类型使用它们的装箱类型(比如Integer)进行存储。
这个部分会讨论如何确定你的对象的内存使用,以及如何提升它——包含修改你的数据结构,或者使用序列化格式存储数据。我们接着会涵盖Spark内存调优和java垃圾回收的内容。
3.1 确定内存消耗
确定你的数据集的内存消耗总量的最佳办法就是,创建一个RDD,将其缓存在内存中,然后查看你的驱动程序的SparkContext日志。日志会告诉你每个分区消耗了多少内存,然后你可以聚合起来计算出RDD的总大小。你可以看见如下的日志信息:
INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB)
该行日志意味着,RDD 0的分区1消耗了717.5 KB。
3.2 优化数据结构
减少内存消耗的第一种方法就是避免会增加内存消耗的java特性,比如基于指针的数据结构(对象)和包装对象。有几种方法来实现它:
1. 在你的数据结构中,优先使用对象和原始数据类型的数组,而不是标准的java集合类(比如HashMap)。fastutil类库为原始数据类型提供了方便的集合类,与java标准类库完全兼容。
2. 尽可能避免包含大量小对象以及对应的指针的嵌套对象。
3. 对于key,考虑使用数字类型的ID或者枚举对象,来替代字符串。
4. 如果你只有小于32G的内存,设置JVM参数-XX:+useCompressedOops,来让指针使用4个字节来替代默认的8个字节。可以在spark-env.sh中增加这些选项。
3.3 序列化的RDD存储
如果尽管使用了上述优化技巧,但是你的对象太大了,无法有效地进行存储,一个更简单得减少内存使用的方法就是使用序列化的方式存储它们,在RDD持久化API中使用序列化的存储级别,比如MEMORY_ONLY_SER。Spark会将每个RDD的分区作为一个超大的字节数组进行存储。使用序列化格式进行存储的唯一缺点就是,更慢的访问时间,因为在访问时不得不反序列化每个对象。如果你想使用序列化格式缓存数据,我们强力建议使用Kryo,因为它比java序列化对象可以大量减少空间占用。
3.4 垃圾回收调优
JVM垃圾回收也许会成为一个大问题,如果你的程序要存储非常大的RDDs的话。(但是如果只是读取一次RDD,然后对它做很多操作,则这不会成为一个问题)java需要移除旧的对象来为新的对象腾出空间,因此需要追踪所有的java对象,并找出其中不在使用的那些。这里需要记住的一点就是,垃圾回收的消耗是与java对象的数量成正相关的,所以使用包含更少对象的数据结构(比如用int数组替代Integer LinkedList),可以极大地极少这方面的开销。一个更好的方法就是使用序列化格式存储对象,如上所述:现在一个RDD分区仅有一个对象(byte数组)。在尝试其他技巧之前,如果GC是一个问题,则需要尝试的第一个技巧就是使用序列化缓存技术。
GC也可能因为你的任务的工作内存和节点上缓存的RDDs之间的冲突而变为一个问题。我们会讨论如何控制分配给RDD缓存的空间来解决这个问题。
3.4.1 测量GC的影响
GC调优的第一个步骤就是收集统计信息——垃圾回收发生的频率以及占用的总时间。这可以通过增加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
来完成。下次你的
spark job
运行的时候,你就会看见每次发生垃圾回收时打印在
worker
日志里的消息。注意这些日志是在你的
worker
节点上,不在驱动程序中。
3.4.2 优化缓存大小
对GC来说,一个重要的配置参数就是用于缓存RDDs的内存量。默认情况下,Spark使用每个executor的内存的60%来缓存RDDs。这意味着只有40%的内存用来容纳任务执行期间创建的对象。
在你的任务执行变慢的情况下,并且你发现你的JVM正在频繁的进行垃圾回收,或者几乎用光所有内存,把这个值降低可以帮助减小内存消耗。如果要改变这个值,比如50%,可以在你的SparkConf上调用conf.set(“spark.storage.memoryFraction”, “0.5”)。结合序列化缓存的使用,使用一个更小的缓存就可以缓和大多数的垃圾回收问题。
3.4.3 高级GC调优
为了进一步进行垃圾回收调优,我们首先需要理解JVM中内存管理的一些基本信息:
· Java堆空间被划分成了两个区域,新生代和老年代。新生代用于存储短暂活跃的对象,老年代用于存储长时间存活的对象。
· 新生代进一步划分为了三个区域,Eden,Survivor1,Survivor2.
· 一个简化的垃圾回收过程的描述如下:当Eden区域满了之后,minor GC会在Eden区域和一个Survivor1区域进行,在Eden区域和Survivor1区域中还存活的对象会被拷贝到Survivor2区域中。Survisor区域被交换。如果一个对象年龄足够大,或者拷贝时发现Survivor2区域也满了,则会将该对象移动到老年代。最后,如果老年代满了,就会发生Full GC。
在Spark中GC调优的目的是确保,只有长期存活的对象进入了老年代中,并且新生代是足够大来容纳所有短暂活跃对象的。这可以避免去通过full GC回收任务执行期间创建的临时对象。一些可能有用的操作如下:
· 通过收集GC统计信息来检查是否发生了太多的垃圾回收。如果在一个task完成之前,full GC执行了多次,那么就意味着没有足够的内存来执行任务。
· 在打印出来的GC统计信息中,如果老年代已经接近于满了,减少用于缓存的空间。这可以通过spark.storage.memroyFraction属性来配置。缓存更少的对象比降低任务执行速度更好。
· 如果有很多minor GC,但是没有很多major GC,分配更多的空间给Eden区域。你可以将Eden区域的大小设置为比预估的每个任务要占用的内存更大。如果Eden区域的大小是E,那么你可以使用选项-Xmn=4/3*E来设置年轻代的大小。
· 举例来说,如果你的任务正在从HDFS中读取数据,任务使用的内存总量可以通过使用从HDFS读取出来的数据块的大小来估算。注意,一个解压缩的块的大小通常是块的2-3倍。所以如果我们希望有3个或4个任务的工作空间,并且hdfs的块大小是64MB,我们可以估算Eden区域的大小为4 * 3 * 64 MB。
· 监控随着新的设置,垃圾回收的发生频率以及执行时间。
我们的经验表明,GC调优的影响依赖于你的应用程序以及可用的内存量。有许多调优方法,但是站在一个高的角度来看,管理GC发生的频率可以减少开销。
4.其他考虑
4.1 并行级别
除非你为每个操作设置的并行级别足够高,否则集群是不会被充分使用的。对于每个文件来说,Spark会根据它的大小自动设置map任务的数量。对于分布式的reduce操作,比如groupByKey和reduceByKey,它使用了最大的父RDD的分去数量。你可以传递并行级别作为第二个参数,或者设置spark.default.parallelism。通常我们建议为每个CPU core设置2-3个任务。
4.2 reduce任务的内存使用
有时,你遇到一个OutOfMemory异常,不是因为你的RDDs无法在内存中容纳,你的任务的working set,比如groupByKey的reduce任务,太大了。Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,等等)都会在每个任务中构建一个hash table来执行分组,通常hash table是非常大的。最简单的方式就是提高任务的并行度,来让每个任务的输入变小。Spark可以支持200ms的任务,因为它会对多个任务重用一个executor的JVM,因此由非常小的任务启动开销,所以你可以放心提高并行度,哪怕超过了你的应用的CPU core数量。
4.3 广播大变量
使用SparkContext提供的广播功能可以大幅度减少每个序列化任务的大小,以及在一个集群上启动一个作业的开销。如果你的任务在内部使用了驱动程序中的大对象,考虑将它变为一个广播变量。Spark会在master上打印每个序列化任务的大小,你可以看它来判断你的任务是不是太大了;通常来说,任务如果超过了20 KB都是值得优化的。
4.4 数据本地化
数据本地化会对Spark作业的性能有极大的影响。如果数据和代码都在一起,那么计算是非常快的。但是如果数据和代码是分离的,那么其中之一必须移动到另外一个所在的节点上去。通常来说,将序列化的代码从一个地方移动到另一个地方是比移动一段数据更快的,因为代码的大小比数据的大小更小。Spark针对数据本地化的原则构建了它的调度。
数据本地化是指,数据距离处理它的代码有多近。基于数据当前的位置,有很多种数据本地化级别:
· PROCESS_LOCAL 数据和运行的代码在同一个JVM中。这是最好的本地化。
· NODE_LOCAL 数据与代码在同一个节点上。例如,数据在同一个节点上的HDFS中,或者在同一个节点的其他executor中。这比PROCESS_LOCAL会稍慢一点,因为需要在进程间移动数据。
· NO_PREF 数据从哪里获取都是一样快的,没有数据本地化的偏好。
· RACK_LOCAL 数据与代码在一个机架上。数据在同一个机架的不同服务器上,因此需要通过网络来发送。
· ANY 数据和代码在网络中的任何地方,并且不在一个机架上。
Spark希望使用最佳的本地化级别来调度所有的任务,但是这是不可能的。在没有未处理的数据在任何空闲的executor中的情况下,Spark会切换到更慢的本地化级别上。有两个选项:a) 等待,直到一个繁忙的CPU空闲下来,可以在有数据的服务器上启动一个任务;b) 立即在任何一个节点上启动任务,然后需要对数据进行移动。
Spark最常见的是等待一下,期望一个CPU空闲下来。但是如果超时以后,它就会开始移动数据到任意一个空闲的CPU上。