/ 城寒 / Spark - Optimization

Spark - Optimization

2018-01-28 posted in [Spark]

开发中的优化

避免创建重复的RDD

重复的RDD会导致重复的计算,表示的是相同数据却而外增加了开销。

// 错误,两次打开同一个文件
val rdd1 = sc.textFile("path")
rdd1.map(...)
val rdd2 = sc.textFile("path")
rdd2.reduce(...)
// 正确,对同一个RDD执行多次算子操作(还可以进一步优化)
val rdd1 = sc.textFile("path")
rdd1.map(...)
rdd1.reduce(...)

尽可能复用同一个RDD

如果两个RDD的内容是包含关系的,如key-value和value,通过复用来减少RDD数量,进而减少算子执行次数

// 错误,为rdd1的子集创建了一个新的rdd2,rdd2中只是一个取了rdd1的value部分
rdd1[Long, String]
val rdd2 = rdd1.map(r => r._2)
rdd1.reduceByKey(...)
rdd2.map(...)  // 对两个不同的但是内容重叠的RDD都执行了计算
// 正确,上述rdd2只是为了对全部的value进行操作,可以使用rdd1达到目的
rdd1[Long, String]
rdd1.reduceByKey(...)
rdd1.map(r => r._2 ...) // 取出所需的部分进行计算,而不是生成一个新的
// 但是这种写法rdd1还是会计算两次,需要结合下一条进行优化

对多次使用的RDD进行持久化

当我们对一个RDD进行复用后,需要保证RDD本身仅仅被计算一次。因为Spark中对每个RDD执行不同的算子会从源头重新计算RDD,然后才执行本次的算子

val rdd1 = sc.textFile("path").cache()
rdd1.map(...)
rdd1.reduce(...)

cache()和persist()的区别:

cache 表示使用非序列化的方式将RDD中的数据全部尝试持久化到内存中 persist 表示手动选择持久化级别,并使用指定的方式进行持久化。持久化有不同的等级和是否序列化之分,详见:

尽量避免使用shuffle类算子

shuffle类算子如reduceByKey, join, distinct, repartition 等需要网络传输,而且可能需要磁盘进行IO,两者都会影响性能,因此要减少使用,如果必需使用也要针对具体情况做相应优化

// 可以使用broadcast和map进行join
// 直接join会导致需要将两个RDD中相同的key都放到同一个节点上,非常复杂
val rdd3 = rdd1.join(rdd12)
// 如果有一个rdd数据量比较小,可以如下操作
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 虽然这种方式也有网络数据传输,但是相比按照key来进行重新分配的方式要快很多。

使用高性能的算子

其他

使用Kryo优化序列化性能,速度会显著由于默认方式,但是需要注册序列化的自定义类型,默认是使用Java的序列化方式,调用ObjectOutputStream / ObjectInputStream

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

优化数据结构

减少复杂结构的传输

资源配置优化

参数名称 说明 调优
num-executors 总Executor进程数,会分布在相同或者不同的节点上 通常需要根据自己的资源情况,并联合其他参数确定,设置较大的值。如50-100
executor-memory 进程的内存大小,0.2执行,0.2聚合,0.6RDD持久化 用num*memory得到中的内存空间,不要超过资源队列中内存的1/3或者1/2而影响其他作业。如4-8G
executor-cores 进程占用cpu核心数,每个task是一个线程只能同时运行一个 用num*cores得到总cpu资源,根据资源队列情况配置。如2-4
driver-memory Driver进程内存,通常比较小 无collect等1G左右,如果有则需要根据数据规模确定
spark.default.parallelism 每个stage默认task数量 通常500-1000比较合适,默认会使用hdfs的block数来确定,很可能过少导致executor得不到task
spark.storage.memoryFaction 用于RDD持久化的内存比例,默认是0.6 根据数据持久化的情况进行调整,如果计算简单而数据大可调大,避免写入硬盘,如果计算比较耗内存(通过观察GC作业耗时来判断,频繁回收内存空间,降低运行速度),可以适当降低
spark.shuffle.memoryFraction shuffle过程中一个task拉取到上一个stage的task输出后的聚合的空间 如果设置过小会导致每个阶段数据shuffle过程中会写入磁盘,影响速度,默认是0.2
./bin/spark-submit \
	--class ClassName   \
	--master yarn       \
	--num-executors		100	\
	--executor-memory	6G	\
	--executor-core		4	\
	--driver-memory		1G	\
	--conf spark.default.parallelism=1000	\
	--conf spark.storage.memoryFraction=0.5	\
	--conf spark.shuffle.memoryFraction=0.3	\
target/spark-job.jar