火花源码系列之累加器实现机制及自定义累加器

  

一、基本概念

  

累加器是引发的一种变量,顾名思义该变量只能增加。有以下特点:

  

1,累加器只能在驱动端构建及并只能是司机读取结果,任务只能累加。

  

2,累加器不会改变火花懒惰的计算的特点。只会在工作触发的时候进行相关累加操作。

  

3,现有累加器的类型。
相信有很多学习大数据的道友,在这里我给大家说说我滴群哦,大数据海量知识分享,784789432。在此我保证,绝对大数据的干货,等待各位的到来,我们一同从入门到精通吧!

  

二,累加器的使用

  

驱动端初始化,并在行动之后获取值。

  

val accum=sc.accumulator(0,“测试Accumulator")
accum.value

  

执行人端进行计算

  

accum +=1;

  

三,累加器的重点类

  

类蓄电池延伸Accumulable

  

主要是实现了累加器的初始化及封装了相关的累加器操作方法。同时在类对象构建的时候向我们的蓄电池注册了累加器。累加器的添加操作的返回值类型和我们传入的值类型可以不一样,所以,我们一定要定义好如何累加和合并值。也即添加方法

  

对象蓄电池:   

该方法在驱动端管理着我们的累加器,也包含了特定累加器的聚合操作。

  

特质AccumulatorParam [T]扩展AccumulableParam [T, T]:

  

AccumulatorParam的addAccumulator操作的泛型封装,具体的实现还是要再具体实现类里面实现addInPlace方法。

  

对象AccumulatorParam:   

主要是进行隐式类型转换的操作。

  

TaskContextImpl:

  

在遗嘱执行人端管理着我们的累加器。

  

四,累加器的源码解析

  

1,司机端的初始化

  

val accum=sc.accumulator(0,“测试Accumulator")

  

val acc=new蓄电池(initialValue参数,一些(名字))

  

主要是在Accumulable(蓄电池)中调用了,这样我们就可以使用蓄电池使用了。

  

Accumulators.register(这)

  

2,执行器端的反序列化得到我们对象的过程

  

首先,我们的value_可以看到其并不支持序列化

  

@volatile @transient私人var value_: R=initialValue//当前值在主

  

其初始化是在我们反序列化的时候做的,反序列化还完成了蓄电池向我们的TaskContextImpl的注册

  

反序列化是在调用ResultTask的RunTask方法的时候做的

  

瓦尔(抽样函数)=ser.deserialize[(抽样[T], (TaskContext,迭代器[T])=比;U)] (
ByteBuffer.wrap (taskBinary.value) Thread.currentThread.getContextClassLoader)

  

过程中会调用

  

私人def readObject (: ObjectInputStream):单位=跑龙套。tryOrIOException {
in.defaultReadObject ()
value_=0
反序列化=true
//自动注册时蓄电池的反序列化任务关闭。
//
//注意内部蓄电池与任务发送反序列化之前TaskContext
//创建和注册TaskContext构造函数。等内部蓄电池SQL
//指标,在这里仍然需要注册。
val taskContext=TaskContext.get ()
如果(taskContext !=null) {
taskContext.registerAccumulator(这)
}
}

  

3,累加器的累加

  

accum +=1;

  

参数。addAccumulator (value_术语)

  

根据不同的累加器参数有不同的实现AccumulableParam

  

如,int类型。最终调用的AccumulatorParam特质的addAccumulator方法。

  

特质AccumulatorParam [T]扩展AccumulableParam [T, T] {
def addAccumulator (t1: T, t2: T): T={
addInPlace (t1, t2)
}
}

  

然后,调用的是各个具体实现的addInPlace方法

  

隐式对象IntAccumulatorParam延伸AccumulatorParam (Int) {
def addInPlace (t1: Int, t2: Int): Int=t1 + t2
def零(initialValue: Int): Int=0
}

  

返回后更新了我们的蓄电池的value_的值。

  

4,累加器的各个节点累加的之后的聚合操作

  

在任务类的运行方法里面得到并返回的

  

(runTask(上下文),context.collectAccumulators ())

  

最终在DAGScheduler里面调用了updateAccumulators(事件)

  

在updateAccumulators方法中

  

Accumulators.add (event.accumUpdates)

  

具体内容如下:

  

def添加(价值观:地图[长,任何]):单位=同步{
((id、价值)& lt; -值){
如果(originals.contains (id)) {
//因为我们现在存储弱引用,我们必须检查是否底层数据
//有效。
原件(id)。得到匹配{
一些(accum)=比;accum.asInstanceOf [Accumulable[任何、任何]]+ +=价值
情况没有=祝辞
把新IllegalAccessError(“试图访问垃圾收集蓄电池!”)

火花源码系列之累加器实现机制及自定义累加器