一、抽样的依赖关系
抽样的依赖关系分为两类:宽依赖和窄依赖。我们可以这样认为:
-
<李>(1)窄依赖:每个父抽样的分区最多孩子被抽样的一个分区使用。李>
<李>(2)宽依赖:每个父抽样分区被多个孩子抽样的分区使用。李>
窄依赖每个孩子抽样的分区的生成操作都是可以并行的,而宽依赖则需要所有的父母抽样分区洗牌结果得到后再进行。
二,org.apache.spark.Dependency。scala源码解析
依赖性是一个抽象类:
<代码>//Denpendency.scala 抽象类依赖[T]扩展可序列化的{ def抽样:抽样[T] }代码>
它有两个子类:NarrowDependency和ShuffleDenpendency,分别对应窄依赖和宽依赖。
<编辑> (1)NarrowDependency也是一个抽象类编辑>定义了抽象方法getParents,输入partitionId,用于获得孩子抽样的某个分区依赖的父母抽样的所有分区。
<代码>//Denpendency.scala 抽象类NarrowDependency [T] (_rdd:抽样[T])扩展依赖[T] {/* * *得到孩子的父分区分区。 * @param partitionId儿童抽样的一个分区 * @return母公司的分区抽样取决于孩子分区 */def getParents (partitionId: Int): Seq [Int] 覆盖def抽样:抽样[T]=_rdd }代码>
窄依赖又有两个具体的实现:OneToOneDependency和RangeDependency。
(a) OneToOneDependency指儿童抽样的分区只依赖于父抽样的一个分区,产生OneToOneDependency的算子有地图,过滤器,flatMap等。可以看到getParents实现很简单,就是传进去一个partitionId,再把partitionId放在列表里面传出去。
<代码>//Denpendency.scala 类OneToOneDependency [T](抽样:抽样[T])延伸NarrowDependency [T](抽样){ 覆盖def getParents (partitionId: Int):列表(Int)=(partitionId) } (b) RangeDependency指儿童抽样分区在一定的范围内一对一的依赖于父抽样分区,主要用于联盟。//Denpendency.scala 类RangeDependency [T](抽样:抽样[T], inStart: Int, outStart: Int,长度:Int) 扩展NarrowDependency [T](抽样){//inStart表示父抽样的开始索引,outStart表示孩子抽样的开始索引 覆盖def getParents (partitionId: Int):列表(Int)={ 如果(partitionId祝辞=outStart,,partitionId & lt;outStart +长度){ 列表(partitionId - outStart + inStart)//表示于当前索引的相对位置 其他}{ 零 } } }代码><编辑> (2)ShuffleDependency指宽依赖编辑>
表示一个父抽样的分区会被孩子抽样的分区使用多次。需要经过洗牌才能形成。
<代码>//Denpendency.scala 类ShuffleDependency [K: ClassTag V: ClassTag, C: ClassTag) ( @transient私人val _rdd:抽样[_ & lt;: Product2 [K、V]], val瓜分者:分割者, val序列化器:序列化器=SparkEnv.get.serializer, val keyOrdering:选择[命令[K]]=没有, val聚合器:选择[聚合器[K V C]]=没有 val mapSideCombine:布尔=false) 扩展依赖(Product2 [K、V]]{//洗牌都是基于PairRDD进行的,所以传入的抽样要是键值类型的 覆盖def抽样:抽样(Product2 [K、V]]=_rdd。asInstanceOf[抽样[Product2 [K、V]]] 私人(火花)val keyClassName:字符串=reflect.classTag [K] .runtimeClass.getName 私人(火花)val valueClassName:字符串=reflect.classTag [V] .runtimeClass.getName 私人(火花)val combinerClassName:选择[String]=选项(reflect.classTag [C]) . map (_.runtimeClass.getName)//获取shuffleId val shuffleId: Int=_rdd.context.newShuffleId()//向shuffleManager注册洗牌信息 val shuffleHandle: shuffleHandle=_rdd.context.env.shuffleManager.registerShuffle ( shuffleId _rdd.partitions。长度,) _rdd.sparkContext.cleaner.foreach (_.registerShuffleForCleanup(这)) }代码>
由于洗牌涉及到网络传输,所以要有序列化序列化器,为了减少网络传输,可以地图端聚合,通过mapSideCombine和聚合器控制,还有关键排序相关的keyOrdering,以及重输出的数据如何分区的分割者,还有一些类信息.Partition之间的关系在洗牌处戛然而止,因此洗牌是划分阶段的依据。
三、两种依赖的区分
首先,窄依赖允许在一个集群节点上以流水线的方式(管道)计算所有父分区。例如,逐个元素地执行地图,然后过滤操作,而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行洗牌,这与MapReduce类似第。二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失抽样分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的谱系图,单个节点失效可能导致这个抽样的所有祖先丢失部分分区,因而需要整体重新计算。