(版本定制)第13课:火花流源码解读之司机容错安全性

  

本期内容:
1。ReceiverBlockTracker容错安全性,
2。DStream和JobGenerator容错安全性


<强>一:容错安全性,
1。ReceivedBlockTracker负责管理火花流运行程序的元数据。数据层面,
2。DStream和JobGenerator是作业调度的核心层面,也就是具体调度到什么程度了,从运行的考虑的.DStream是逻辑层面只
3。作业生存层面,JobGenerator是工作调度层面,具体调度到什么程度了。从运行的角度的。

<强>谈司机容错你要考虑司机中有那些需要维持状态的运行。,
1。ReceivedBlockTracker跟踪了数据,因此需要容错。通过细胞膜方式容错只
2。DStreamGraph表达了依赖关系,恢复状态的时候需要根据DStream恢复计算逻辑级别的依赖关系。通过检查站方式容错只
3。JobGenerator表面你是怎么基于ReceiverBlockTracker中的数据,以及DStream构成的依赖关系不断的产生工作的过程。你消费了那些数据,进行到什么程度了。

<强>总结如下:

<强> (版本定制)第13课:火花流源码解读之司机容错安全性

<强> ReceivedBlockTracker: ,
1。ReceivedBlockTracker会管理火花流运行过程中所有的数据,并且把数据分配给需要的批次,所有的动作都会被细胞膜写入到日志中,司机失败的话,就可以根据历史恢复追踪状态,在ReceivedBlockTracker创建的时候,使用检查点保存历史目录。

<强>下面就从接收器收到数据之后,怎么处理的开始。,
2。ReceiverBlockTracker.addBlock源码如下:
接收器接收到数据,把元数据信息汇报上来,然后通过ReceiverSupervisorImpl就将数据汇报上来,就直接通过细胞膜进行容错只
当接收机的管理者,ReceiverSupervisorImpl把元数据信息汇报给司机的时候,正在处理是交给ReceiverBlockTracker。ReceiverBlockTracker将数据写进细胞膜文件中,然后才会写进内存中,被当前的火花流程序的调度器使用的,也就是JobGenerator使用的.JobGenerator不可能直接使用WAL.WAL的数据在磁盘中,这里JobGenerator使用的内存中缓存的数据结构


/* *, Add  received 块只却;能够event  will  get  written 用,write  ahead  log  (if 启用)只*/def  addBlock (receivedBlockInfo: receivedBlockInfo):, Boolean =, {
  try  {
  val  writeResult =, writeToLog (BlockAdditionEvent (receivedBlockInfo)),//接收数据后,先进行细胞膜
  if  (writeResult), {
  ,,,,,synchronized  {
  getReceivedBlockQueue (receivedBlockInfo.streamId), +=, receivedBlockInfo //当细胞膜成功后,将Block 信息元数据信息加入到Block 队列中
  ,,,,,}
  ,,,,,logDebug (s" Stream  $ {receivedBlockInfo.streamId}, received “, +
  s" block  $ {receivedBlockInfo.blockStoreResult.blockId}“)
  ,,,},{else 
  ,,,,,logDebug (s" Failed 用acknowledge  stream  $ {receivedBlockInfo.streamId}, receiving “, +
  s" block  $ {receivedBlockInfo.blockStoreResult.blockId},拷贝,Write  Ahead 日志!”)
  ,,,}
  ,,writeResult
  ,,},{catch 
  case 非致命的(e),=比;
  ,,,,,logError (s" Error  adding  block  receivedBlockInfo"美元;,,e)
  假
  }
  }

驱动端接收到的数据保存在streamIdToUnallocatedBlockQueues中,具体结构如下:


 private  type  ReceivedBlockQueue =, mutable.Queue [ReceivedBlockInfo]
  private  val  streamIdToUnallocatedBlockQueues =, new  mutable.HashMap [Int, ReceivedBlockQueue] 
 allocateBlocksToBatch把接收到的数据分配给批,根据streamId取出块,由此就知道Spark 流处理数据的时候可以有不同数据来源
 <代码> <强>那到底什么是batchTime ? ,
batchTime是上一个工作分配完数据之后,开始再接收到的数据的时间。
/* *
  ,* Allocate  all  unallocated  blocks 用,given 批。
  ,*却;能够event  will  get  written 用,write  ahead  log  (if 启用)。
  ,*/def  allocateBlocksToBatch batchTime:时间:,Unit =, synchronized  {
  if  (lastAllocatedBatchTime ==, null  | |, batchTime 祝辞,lastAllocatedBatchTime), {
  val  streamIdToBlocks =, streamIds.map  {, streamId =比;
  ,,,,,,,(streamId, getReceivedBlockQueue (streamId) .dequeueAll (x =祝辞,true)),//根据streamId获取块信息
  ,,}.toMap
  val  allocatedBlocks =, AllocatedBlocks (streamIdToBlocks)
  if  (writeToLog (BatchAllocationEvent (batchTime, allocatedBlocks))), {
  timeToAllocatedBlocks.put (batchTime, allocatedBlocks)
  时间=lastAllocatedBatchTime  batchTime //这里有对batchTime进行赋值,就是上一个工作分配完数据后,开始在接收到数据的时间
  ,,,},{else 
  ,,,,,logInfo (s" Possibly  processed  batch  batchTime 美元;need 用be  processed  again 拷贝WAL  recovery")
  ,,,}
  ,,},{else 
  logInfo (s" Possibly  processed  batch  batchTime 美元;need 用be  processed  again 拷贝WAL  recovery")
  ,,}
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null

(版本定制)第13课:火花流源码解读之司机容错安全性