3.2.2 依赖
RDD是易转换、易操作的,这意味着用户可以从已有的RDD转换出新的RDD。新、旧RDD之间必定存在着某种联系,这种联系称为RDD的依赖关系。RDD间的依赖关系是Spark中的一个重要概念,是Spark进行容错、优化与任务调度的基础。
RDD的依赖关系分为两种,如图3.1所示。
窄依赖:父RDD的每个分区最多被其子RDD的一个分区所依赖,也就是说子RDD的每个分区依赖于常数个父分区,子RDD每个分区的生成与父RDD的数据规模无关。
宽依赖:父RDD的每个分区被其子RDD的多个分区所依赖,子RDD每个分区的生成与父RDD的数据规模相关。


图3.1 依赖关系
在图3.1中,透明的矩形框代表一个RDD,每个矩形框里面的实心矩形代表RDD的一个分区。
为何要区分这两种依赖关系?一方面,对于若干个彼此窄依赖关系的RDD,基于任何一个子RDD分区可以方便地计算出其所有祖先RDD的相应分区,这些RDD可以在集群的节点的内存中以流水线(pipeline)的方式高效执行,如图3.2中Stage2所示。另一方面,对于窄依赖关系间的RDD,当子RDD的一个分区出错,可以很方便地利用父RDD中对应的分区重新计算该错误分区,因此窄依赖关系使得数据的容错与恢复非常方便;而对于宽依赖关系,子RDD的一个分区出错会导致其对应父RDD的多个分区进行重新计算,过程类似于MapReduce的Shuffle操作,代价非常高。
图3.2 流水线形式计算多个窄依赖关系间的RDD
示例2-3 RDD间的依赖关系
scala>val rdd =sc.makeRDD(1 to 10)
scala>val mapRDD=rdd.map(x=>(x, x))
scala>mapRDD.dependencies
scala>val shuffleRDD=mapRDD.partitionBy(new org.apache.spark.HashPartitioner(3))
scala>shuffleRDD.dependencies
运行结果:
res1: Seq[org.apache.spark.Dependency[_]]=List(org.apache.spark.OneToOneDependency@45ec9c22)
res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@7edbd4ec)
其中res1返回的org.apache.spark.OneToOneDependency为窄依赖关系;res2返回的org.apache. spark.ShuffleDependency为宽依赖关系。
3.2.3 计算
Spark 中每个RDD中都包含一个函数,即在父RDD上执行何种计算后得到当前RDD。每个RDD的计算都是以分区为单位的,而且每个RDD中的计算都是在对迭代器进行复合,不需要保存每次计算的结果。
3.2.4 分区函数
对于Key-Value形式的RDD,Spark允许用户根据关键字(Key)指定分区顺序,这是一个可选的功能。目前支持哈希分区(HashPartitioner)和范围分区(RangPartitioner)。这一特性有助于提高RDD之间某些操作的执行效率,例如可指定将两个RDD按照同样的哈希分区方式进行分区(将同一机器上具有相同关键字的记录放在一个分区),当这两个RDD之间执行join操作时,会简化Shuffle过程,提高效率。
3.2.5 优先位置
RDD优先位置属性与Spark中的作业调度与管理密切相关,是RDD中每个分区所存储的位置。遵循“移动计算不移动数据”这一理念,Spark在执行任务时尽可能地将计算分配到相关数据块所在的节点上。以从Hadoop中读取数据生成RDD为例,preferredLocations返回每一个数据块所在节点的机器名或者IP地址,如果每一块数是多份存储的,那么将返回多个机器地址。
示例2-4 RDD间的优先位置
scala>val rdd=sc.textFile("hdfs://master:9000/user/dong/input/file.txt").
map(_.contains("l"))
//根据依赖关系找到源头rdd。
scala>val hadoopRDD=rdd.dependencies(0).rdd
//源头rdd的分区数。
scala>hadoopRDD.partitions.size
//查看第一个分区位置。
scala>hadoopRDD.preferredLocations(HadoopRDD.partitions(0))
//查看第二个分区位置。
scala>hadoopRDD.preferredLocations(HadoopRDD.partitions(1))
运行结果:
res0: Int = 2
res1: Seq [String] = WrappedArray(slave2)
res2: Seq [String] = WrappedArray(slave2)
运行结果表明hadoopRDD共有2个分区,均位于slave2上。
……