抽样血缘关系源码详解!

  
一、抽样的依赖关系
  

抽样的依赖关系分为两类:宽依赖和窄依赖。我们可以这样认为:

  
      <李>(1)窄依赖:每个父抽样的分区最多孩子被抽样的一个分区使用。   <李>(2)宽依赖:每个父抽样分区被多个孩子抽样的分区使用。   
  

窄依赖每个孩子抽样的分区的生成操作都是可以并行的,而宽依赖则需要所有的父母抽样分区洗牌结果得到后再进行。

  
二,org.apache.spark.Dependency。scala源码解析
  

依赖性是一个抽象类:

  
 <代码>//Denpendency.scala
  抽象类依赖[T]扩展可序列化的{
  def抽样:抽样[T]
  } 
  

它有两个子类:NarrowDependency和ShuffleDenpendency,分别对应窄依赖和宽依赖。

  <编辑> (1)NarrowDependency也是一个抽象类   

定义了抽象方法getParents,输入partitionId,用于获得孩子抽样的某个分区依赖的父母抽样的所有分区。

  
 <代码>//Denpendency.scala
  抽象类NarrowDependency [T] (_rdd:抽样[T])扩展依赖[T] {/* *
  *得到孩子的父分区分区。
  * @param partitionId儿童抽样的一个分区
  * @return母公司的分区抽样取决于孩子分区
  */def getParents (partitionId: Int): Seq [Int]
  
  覆盖def抽样:抽样[T]=_rdd
  } 
  

窄依赖又有两个具体的实现:OneToOneDependency和RangeDependency。
(a) OneToOneDependency指儿童抽样的分区只依赖于父抽样的一个分区,产生OneToOneDependency的算子有地图,过滤器,flatMap等。可以看到getParents实现很简单,就是传进去一个partitionId,再把partitionId放在列表里面传出去。

  
 <代码>//Denpendency.scala
  类OneToOneDependency [T](抽样:抽样[T])延伸NarrowDependency [T](抽样){
  覆盖def getParents (partitionId: Int):列表(Int)=(partitionId)
  }
  (b) RangeDependency指儿童抽样分区在一定的范围内一对一的依赖于父抽样分区,主要用于联盟。//Denpendency.scala
  类RangeDependency [T](抽样:抽样[T], inStart: Int, outStart: Int,长度:Int)
  扩展NarrowDependency [T](抽样){//inStart表示父抽样的开始索引,outStart表示孩子抽样的开始索引
  覆盖def getParents (partitionId: Int):列表(Int)={
  如果(partitionId祝辞=outStart,,partitionId & lt;outStart +长度){
  列表(partitionId - outStart + inStart)//表示于当前索引的相对位置
  其他}{
  零
  }
  }
  } 
  <编辑> (2)ShuffleDependency指宽依赖   

表示一个父抽样的分区会被孩子抽样的分区使用多次。需要经过洗牌才能形成。

  
 <代码>//Denpendency.scala
  类ShuffleDependency [K: ClassTag V: ClassTag, C: ClassTag) (
  @transient私人val _rdd:抽样[_ & lt;: Product2 [K、V]],
  val瓜分者:分割者,
  val序列化器:序列化器=SparkEnv.get.serializer,
  val keyOrdering:选择[命令[K]]=没有,
  val聚合器:选择[聚合器[K V C]]=没有
  val mapSideCombine:布尔=false)
  扩展依赖(Product2 [K、V]]{//洗牌都是基于PairRDD进行的,所以传入的抽样要是键值类型的
  覆盖def抽样:抽样(Product2 [K、V]]=_rdd。asInstanceOf[抽样[Product2 [K、V]]]
  
  私人(火花)val keyClassName:字符串=reflect.classTag [K] .runtimeClass.getName
  私人(火花)val valueClassName:字符串=reflect.classTag [V] .runtimeClass.getName
  私人(火花)val combinerClassName:选择[String]=选项(reflect.classTag [C]) . map (_.runtimeClass.getName)//获取shuffleId
  val shuffleId: Int=_rdd.context.newShuffleId()//向shuffleManager注册洗牌信息
  val shuffleHandle: shuffleHandle=_rdd.context.env.shuffleManager.registerShuffle (
  shuffleId _rdd.partitions。长度,)
  
  _rdd.sparkContext.cleaner.foreach (_.registerShuffleForCleanup(这))
  } 
  

由于洗牌涉及到网络传输,所以要有序列化序列化器,为了减少网络传输,可以地图端聚合,通过mapSideCombine和聚合器控制,还有关键排序相关的keyOrdering,以及重输出的数据如何分区的分割者,还有一些类信息.Partition之间的关系在洗牌处戛然而止,因此洗牌是划分阶段的依据。

  
三、两种依赖的区分
  

首先,窄依赖允许在一个集群节点上以流水线的方式(管道)计算所有父分区。例如,逐个元素地执行地图,然后过滤操作,而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行洗牌,这与MapReduce类似第。二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失抽样分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的谱系图,单个节点失效可能导致这个抽样的所有祖先丢失部分分区,因而需要整体重新计算。

抽样血缘关系源码详解!