/ 城寒 / Spark - Spark Join

Spark - Spark Join

2018-03-17 posted in [Spark]

Spark Join 原理

原文

Join操作是数据库和大数据计算中的高级特性,大多数场景都需要进行复杂的Join操作,本文从原理层面介绍了SparkSQL支持的常见Join算法及其适用场景。

Join背景介绍

Join是数据库查询永远绕不开的话题,传统查询SQL技术总体可以分为简单操作(过滤操作-where、排序操作-limit等),聚合操作-groupby以及Join操作等。其中Join操作是最复杂、代价最大的操作类型,也是OLAP场景中使用相对较多的操作。因此很有必要对其进行深入研究。

另外,从业务层面来讲,用户在数仓建设的时候也会涉及Join使用的问题。通常情况下,数据仓库中的表一般会分为“低层次表”和“高层次表”。

所谓“低层次表”,就是数据源导入数仓之后直接生成的表,单表列值较少,一般可以明显归为维度表或事实表,表和表之间大多存在外健依赖,所以查询起来会遇到大量Join运算,查询效率很差。而“高层次表”是在“低层次表”的基础上加工转换而来,通常做法是使用SQL语句将需要Join的表预先进行合并形成“宽表”,在宽表上的查询不需要执行大量Join,效率很高。但宽表缺点是数据会有大量冗余,且相对生成较滞后,查询结果可能并不及时。

为了获得时效性更高的查询结果,大多数场景都需要进行复杂的Join操作。Join操作之所以复杂,主要是通常情况下其时间空间复杂度高,且有很多算法,在不同场景下需要选择特定算法才能获得最好的优化效果。本文将介绍SparkSQL所支持的几种常见的Join算法及其适用场景。

Join常见分类以及基本实现机制

当前SparkSQL支持三种Join算法:shuffle hash join、broadcast hash join以及sort merge join。其中前两者归根到底都属于hash join,只不过在hash join之前需要先shuffle还是先broadcast。其实,hash join算法来自于传统数据库,而shuffle和broadcast是大数据的皮(分布式),两者一结合就成了大数据的算法了。因此可以说,大数据的根就是传统数据库。既然hash join是“内核”,那就刨出来看看,看完把“皮”再分析一下。

hash join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,很简单一个Join节点,参与join的两张表是item和order,join key分别是item.id以及order.i_id。现在假设这个Join采用的是hash join算法,整个过程会经历三步:

  1. 确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。此事例中item为Build Table,order为Probe Table。
  2. 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。数据缓存在内存中,如果内存放不下需要dump到外存。
  3. 探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起。

基本流程可以参考上图,这里有两个小问题需要关注:

  1. hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条街。
  2. 为什么Build Table选择小表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用。

上文说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过一定的分布式改造,就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。hash join分布式改造一般有两种经典方案:

  1. broadcast hash join:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景。
  2. shuffler hash join:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。

broadcast hash join

如下图所示,broadcast hash join可以分为两步:

  1. broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于BitTorrent的TorrentBroadcast。
  2. hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探。
  3. SparkSQL规定broadcast hash join执行的基本条件为被广播小表必须小于参数spark.sql.autoBroadcastJoinThreshold,默认为10M。

shuffle hash join

在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步:

  1. shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle。
  2. hash join阶段:每个分区节点上的数据单独执行单机hash join算法。

看到这里,可以初步总结出来如果两张小表join可以直接使用单机版hash join;如果一张大表join一张极小表,可以选择broadcast hash join算法;而如果是一张大表join一张小表,则可以选择shuffle hash join算法;那如果是两张大表进行join呢?

sort merge join

SparkSQL对两张大表join采用了全新的算法-sort-merge join,如下图所示,整个过程分为三个步骤:

  1. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理。
  2. sort阶段:对单个分区节点的两表数据,分别进行排序。
  3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边。如下图所示:

经过上文的分析,很明显可以得出来这几种Join的代价关系:cost(broadcast hash join) < cost(shuffle hash join) < cost(sort merge join),数据仓库设计时最好避免大表与大表的join查询,SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql.autoBroadcastJoinThreshold调大,让更多join实际执行为broadcast hash join。

总结

Join操作是数据库和大数据计算中的高级特性,因为其独特的复杂性,很少有同学能够讲清楚其中的原理。本文试图带大家真正走进Join的世界,了解常用的几种Join算法以及各自的适用场景。后面两篇文章将会在此基础上不断深入Join内部,一点一点地揭开它的面纱,敬请关注!

Spark Join 部分源码

原文

PairRDDFunctions类提供了以下两个join接口,只提供一个参数。 不指定分区函数时默认使用HashPartitioner;提供numPartitions参数时,其内部的分区函数是HashPartitioner(numPartitions)

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
    // 这里的defaultPartitioner 就是HashPartitioner
    // 分区数由spark.default.parallism数指定,如果未指定就取分区数大的
    join(other, defaultPartitioner(self, other))
}

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
    // 指定分区数目
    join(other, new HashPartitioner(numPartitions))
}

以上两个join接口都是调用的这个方法 rdd1.join(rdd2) => rdd1.cogroup(rdd2, partitioner)

    /**
    * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
    * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
    * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
    */
    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
        // rdd.join的实现:
        // rdd1.join(rdd2) => rdd1.cogroup(rdd2,partitioner) => flatMapValues(遍历两个value的迭代器)
        this.cogroup(other, partitioner).flatMapValues( 
            pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) 
        ) 
    }

    /**
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
    def cogroup[W](other: RDD[(K, W)], 
        partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {

        if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
            throw new SparkException("Default partitioner cannot partition array keys.")
        }
        // 这里构造一个CoGroupedRDD,其键值对中的value要求是Iterable[V]和Iterable[W]类型
        val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
        cg.mapValues { case Array(vs, w1s) =>
            (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
        }
    }

这是CoGroupedRDD的类声明,其中有两个与java 语法的不同:

  1. 类型声明中的小于号“<”,这个在scala 中叫做变量类型的上界,也就是原类型应该是右边类型的子类型,具体参见《快学scala》的17.3节
  2. @transient:这个是瞬时变量注解,不用进行序列化 ,也可以参见《快学Scala》的15.3节
/*
* 这里返回的rdd的类型是(K,Array[Iterable[_]]),即key不变,value为所有对应这个key的value的迭代器的数组
*/
class CoGroupedRDD[K: ClassTag](
    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    part: Partitioner)
    extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil)

看看这个RDD的依赖以及如何分区的

再看这两个函数之前,最好先了解下这两个类是干什么的:

  1. CoGroupPartition 是 Partition 的一个子类,其 narrowDeps是NarrowCoGroupSplitDep类型的一个数组
/**
 *  这里说到CoGroupPartition 包含着父RDD依赖的映射关系,
 * @param index:可以看到CoGroupPartition 将index作为哈希code进行分区
 * @param narrowDeps:narrowDeps是窄依赖对应的分区数组
 */
private[spark] class CoGroupPartition(
        override val index: Int, val narrowDeps: Array[Option[NarrowCoGroupSplitDep]])
    extends Partition with Serializable {
    override def hashCode(): Int = index
    override def equals(other: Any): Boolean = super.equals(other)
}
  1. 这个NarrowCoGroupSplitDep的主要功能就是序列化,为了避免重复,对rdd做了瞬态注解
// 这个NarrowCoGroupSplitDep的主要功能就是序列化,为了避免重复,对rdd做了瞬态注解
private[spark] case class NarrowCoGroupSplitDep(
    @transient rdd: RDD[_], //瞬态的字段不会被序列化,适用于临时变量
    @transient splitIndex: Int,
    var split: Partition
  ) extends Serializable {
 
  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
    // Update the reference to parent split at the time of task serialization
    split = rdd.partitions(splitIndex)
    oos.defaultWriteObject()
  }
}

回到CoGroupedRDD上来,先看这个RDD的依赖是如何划分的:

/*
* 简单看下CoGroupedRDD重写的RDD的getDependencies:
 * 如果rdd和给定分区函数相同就是窄依赖
 * 否则就是宽依赖
*/
override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_] =>
    if (rdd.partitioner == Some(part)) {
      /*如果两个RDD的分区函数和join时指定的分区函数相同,则对应窄依赖*/
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](
        rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
    }
  }
}

CoGroupedRDD.getPartitions 返回一个带有Partitioner.numPartitions个分区类型为CoGroupPartition的数组

// 这里返回一个带有Partitioner.numPartitions个分区类型为CoGroupPartition的数组
override def getPartitions: Array[Partition] = {
  val array = new Array[Partition](part.numPartitions)
  for (i <- 0 until array.length) {
    // Each CoGroupPartition will have a dependency per contributing RDD
 
    //rdds.zipWithIndex 这个是生成一个(rdd,rddIndex)的键值对,可以查看Seq或者Array的API
    //继续跟到CoGroupPartition这个Partition,其是和Partition其实区别不到,只是多了一个变量narrowDeps
    //回来看NarrowCoGroupSplitDep的构造,就是传入了每一个rdd和分区索引,以及分区,其可以将分区序列化
    array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
      // Assume each RDD contributed a single dependency, and get it
      dependencies(j) match {
        case s: ShuffleDependency[_, _, _] => None
        case _ => Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
      }
    }.toArray)
  }
  array
}

的总结下CoGroupedRDD,其类型大概是(k,(Array(CompactBuffer[v1]),Array(CompactBuffer[v2]))),这其中用到了内部的封装,以及compute函数的实现 有兴趣的可以继续阅读下源码,这一部分就不介绍了。

总结:

  1. join 算子内部使用了cogroup算子,这个算子返回的是(key,(v1,v2))这种形式的元组
  2. 深入cogroup算子,发现其根据rdd1,rdd2创建了一个CoGroupedRDD
  3. 简要的分析了CoGroupedRDD的依赖关系,看到如果两个rdd的分区函数相同,那么生成的rdd分区数不变,它们之间是一对一依赖,也就是窄依赖,从而可以减少依次shuffle
  4. CoGroupedRDD的分区函数就是将两个rdd的相同分区索引的分区合成一个新的分区,并且通过NarrowCoGroupSplitDep这个类实现了序列化