第12课:火花流源码解读之Execu

  

接收器接收到的数据交由ReceiverSupervisorImpl来管理。

ReceiverSupervisorImpl接收到数据后,会数据存储并且将数据的元数据报告给ReceiverTracker。

执行人的数据容错可以有三种方式:

<李>

细胞膜日志

<李>

数据副本

<李>

接收接收机的数据流回放

/* *,Store  block 以及report  it 用driver  */def  pushAndReportBlock (   ,,,receivedBlock:, receivedBlock,   ,,,metadataOption:,选项(任何)   ,,,blockIdOption:,选项(StreamBlockId)   ),{才能   val 才能;blockId =, blockIdOption.getOrElse (nextBlockId)   val 才能;time =System.currentTimeMillis   val 才能;blockStoreResult =, receivedBlockHandler.storeBlock (blockId, receivedBlock)   logDebug才能(s”Pushed  block  blockId 美元;拷贝$ {(System.currentTimeMillis 安康;时间)},女士”)   val 才能;numRecords =blockStoreResult.numRecords   val 才能;blockInfo =, ReceivedBlockInfo (numRecords, streamId,还以为,metadataOption, blockStoreResult)   trackerEndpoint.askWithRetry才能(布尔)(AddBlock (blockInfo))   logDebug才能(s”Reported  block  blockId美元”)   }

数据的存储,是借助receiverBlockHandler,它的实现有两种方式:

private  val  receivedBlockHandler:, ReceivedBlockHandler =, {   if 才能;(WriteAheadLogUtils.enableReceiverLog (env.conf)), {   ,,,if  (checkpointDirOption.isEmpty), {   ,,,,,throw  new  SparkException (   ,,,,,,,”Cannot  enable  receiver  write-ahead  log  without  checkpoint  directory 设置只”,+   ,,,,,,,,,”Please  use  streamingContext.checkpoint(),用set 从而checkpoint 目录只”,+   ,,,,,,,,,”阅读documentation  for  more 细节。”)   ,,,}   ,,,new  WriteAheadLogBasedBlockHandler (env.blockManager, receiver.streamId,   ,,,,,,,receiver.storageLevel env.conf,,, checkpointDirOption.get hadoopConf)   ,,},{else    ,,,new  BlockManagerBasedBlockHandler (env.blockManager, receiver.storageLevel)   ,,}   }


WriteAheadLogBaseBlockHandler一方面将数据交由BlockManager管理,另一方面会写犯下日志。

一旦节点崩溃,可以由细胞膜日志恢复内存中的数据。在细胞膜开始时,就不在建议数据存储多个副本。

private  val  effectiveStorageLevel =, {   if 才能;(storageLevel.deserialized), {   ,,,logWarning(年代“Storage  level  serialization  $ {storageLevel.deserialized}, is  not  supported 当“,+   ,,,,,年代”,write  ahead  log  is 启用,,change 用serialization 错误的”)   ,,}   if 才能;(storageLevel.replication 祝辞,1),{   ,,,logWarning(年代“Storage  level  replication  $ {storageLevel.replication}, is  unnecessary  when ”, +   ,,,,,年代“write  ahead  log  is 启用,,change 用replication  1”)   ,,}      StorageLevel才能(storageLevel.useDisk, storageLevel.useMemory,, storageLevel.useOffHeap,,假的,,1)   }


而BlockManagerBaseBlockHandler直接将数据交由BlockManager管理。

如果不写细胞膜,当节点崩溃了一定会数据丢失吗?这个也不一定。因为在构建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的时候会将接收机的storageLevel传入.storageLevel用来描述数据保存的地方(内存,磁盘)以及副本个数。

class  StorageLevel 私人(   ,,,private  var  _useDisk:,布尔,   ,,,private  var  _useMemory:,布尔,   ,,,private  var  _useOffHeap:,布尔,   ,,,private  var  _deserialized:,布尔,   ,,,private  var  _replication:, Int =, 1)   extends 才能外部化

公有如下种类的StorageLevel:

val  NONE =, new  StorageLevel(假的,假的,还以为,假,假)   val  DISK_ONLY =, new  StorageLevel(真的,,假的,,假的,,假)   val  DISK_ONLY_2 =, new  StorageLevel(真的,,假的,,假的,,假的,,2)   val  MEMORY_ONLY =, new  StorageLevel(假,,真的,,假的,,真的)   val  MEMORY_ONLY_2 =, new  StorageLevel(假,,真的,,假的,,真的,,2)   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

第12课:火花流源码解读之Execu