Spark - Optimization
开发中的优化
避免创建重复的RDD
重复的RDD会导致重复的计算,表示的是相同数据却而外增加了开销。
// 错误,两次打开同一个文件
val rdd1 = sc.textFile("path")
rdd1.map(...)
val rdd2 = sc.textFile("path")
rdd2.reduce(...)
// 正确,对同一个RDD执行多次算子操作(还可以进一步优化)
val rdd1 = sc.textFile("path")
rdd1.map(...)
rdd1.reduce(...)
尽可能复用同一个RDD
如果两个RDD的内容是包含关系的,如key-value和value,通过复用来减少RDD数量,进而减少算子执行次数
// 错误,为rdd1的子集创建了一个新的rdd2,rdd2中只是一个取了rdd1的value部分
rdd1[Long, String]
val rdd2 = rdd1.map(r => r._2)
rdd1.reduceByKey(...)
rdd2.map(...) // 对两个不同的但是内容重叠的RDD都执行了计算
// 正确,上述rdd2只是为了对全部的value进行操作,可以使用rdd1达到目的
rdd1[Long, String]
rdd1.reduceByKey(...)
rdd1.map(r => r._2 ...) // 取出所需的部分进行计算,而不是生成一个新的
// 但是这种写法rdd1还是会计算两次,需要结合下一条进行优化
对多次使用的RDD进行持久化
当我们对一个RDD进行复用后,需要保证RDD本身仅仅被计算一次。因为Spark中对每个RDD执行不同的算子会从源头重新计算RDD,然后才执行本次的算子
val rdd1 = sc.textFile("path").cache()
rdd1.map(...)
rdd1.reduce(...)
cache()和persist()的区别:
cache 表示使用非序列化的方式将RDD中的数据全部尝试持久化到内存中 persist 表示手动选择持久化级别,并使用指定的方式进行持久化。持久化有不同的等级和是否序列化之分,详见:
尽量避免使用shuffle类算子
shuffle类算子如reduceByKey, join, distinct, repartition 等需要网络传输,而且可能需要磁盘进行IO,两者都会影响性能,因此要减少使用,如果必需使用也要针对具体情况做相应优化
// 可以使用broadcast和map进行join
// 直接join会导致需要将两个RDD中相同的key都放到同一个节点上,非常复杂
val rdd3 = rdd1.join(rdd12)
// 如果有一个rdd数据量比较小,可以如下操作
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 虽然这种方式也有网络数据传输,但是相比按照key来进行重新分配的方式要快很多。
- 必要时使用map-side预聚合shuffle
- 必须使用shuffle时,在本地先按key进行一次combiner,使得每个key只有一条,减少拉取操作的次数。
- 通常reduceByKey、aggregateByKey会自动使用用户的自定义函数来进行预聚合,而groupByKey则不行。
需要注意的是,如果数据重复率很低,直接使用groupByKey反而会比较快,可以省去一次分片内的预聚合
- 从Kafka接收数据时,需要注意两点
- Kafka的partition数等于Spark中的partition数,所以可以适当增加kafka中的partition数来提高速度
- Kafka中的数据每条应该长一些,即包含多条原子数据。在Spark中,repartition的速度不仅和数据体积有关,也和条数有明显关联。实验表明,Kafka中分区数相同的情况下,直接repartition 100w行和repartition 1w行 + flatMap,后者会快很多,因此数据应该打包一下再传输
- 可以自己构建partitioner来进行分片,通常任务时间不稳定,时快时慢,都是由于数据倾斜导致的,可以从Web UI上看出
使用高性能的算子
- 使用 reduceByKey、aggregateByKey 替代 groupByKey
- 使用 mapPartitions 替代普通的 map ,原理是每次函数调用处理一个partition的数据,而不是只处理一条,会相对高效,但是要注意同时处理这些数据是否会导致内存空间不足 OOM
- 使用 foreachPartitions 替代 foreach,原理同上,在类似输出到MySQL的时候效果极佳
- 在 filter 后进行 coalesce,当filter过滤掉比较多的数据时(比如大于30%),使用coalesce手动减少RDD的Partition数量,可以减少task的使用数量,分配更均匀
- 使用 repartitionAndSortWithinPartions 替代 repartition 与 sort ,可以在一次shuffle同时进行两个操作
其他
使用Kryo优化序列化性能,速度会显著由于默认方式,但是需要注册序列化的自定义类型,默认是使用Java的序列化方式,调用ObjectOutputStream / ObjectInputStream
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
优化数据结构
减少复杂结构的传输
资源配置优化
参数名称 | 说明 | 调优 |
---|---|---|
num-executors | 总Executor进程数,会分布在相同或者不同的节点上 | 通常需要根据自己的资源情况,并联合其他参数确定,设置较大的值。如50-100 |
executor-memory | 进程的内存大小,0.2执行,0.2聚合,0.6RDD持久化 | 用num*memory得到中的内存空间,不要超过资源队列中内存的1/3或者1/2而影响其他作业。如4-8G |
executor-cores | 进程占用cpu核心数,每个task是一个线程只能同时运行一个 | 用num*cores得到总cpu资源,根据资源队列情况配置。如2-4 |
driver-memory | Driver进程内存,通常比较小 | 无collect等1G左右,如果有则需要根据数据规模确定 |
spark.default.parallelism | 每个stage默认task数量 | 通常500-1000比较合适,默认会使用hdfs的block数来确定,很可能过少导致executor得不到task |
spark.storage.memoryFaction | 用于RDD持久化的内存比例,默认是0.6 | 根据数据持久化的情况进行调整,如果计算简单而数据大可调大,避免写入硬盘,如果计算比较耗内存(通过观察GC作业耗时来判断,频繁回收内存空间,降低运行速度),可以适当降低 |
spark.shuffle.memoryFraction | shuffle过程中一个task拉取到上一个stage的task输出后的聚合的空间 | 如果设置过小会导致每个阶段数据shuffle过程中会写入磁盘,影响速度,默认是0.2 |
./bin/spark-submit \
--class ClassName \
--master yarn \
--num-executors 100 \
--executor-memory 6G \
--executor-core 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
target/spark-job.jar