接收器接收到的数据交由ReceiverSupervisorImpl来管理。
ReceiverSupervisorImpl接收到数据后,会数据存储并且将数据的元数据报告给ReceiverTracker。
执行人的数据容错可以有三种方式:
- <李>
细胞膜日志
李> <李>数据副本
李> <李>接收接收机的数据流回放
/* *,Store block 以及report it 用driver */def pushAndReportBlock ( ,,,receivedBlock:, receivedBlock, ,,,metadataOption:,选项(任何) ,,,blockIdOption:,选项(StreamBlockId) ),{才能 val 才能;blockId =, blockIdOption.getOrElse (nextBlockId) val 才能;time =System.currentTimeMillis val 才能;blockStoreResult =, receivedBlockHandler.storeBlock (blockId, receivedBlock) logDebug才能(s”Pushed block  blockId 美元;拷贝$ {(System.currentTimeMillis 安康;时间)},女士”) val 才能;numRecords =blockStoreResult.numRecords val 才能;blockInfo =, ReceivedBlockInfo (numRecords, streamId,还以为,metadataOption, blockStoreResult) trackerEndpoint.askWithRetry才能(布尔)(AddBlock (blockInfo)) logDebug才能(s”Reported block  blockId美元”) }
数据的存储,是借助receiverBlockHandler,它的实现有两种方式:
private val receivedBlockHandler:, ReceivedBlockHandler =, { if 才能;(WriteAheadLogUtils.enableReceiverLog (env.conf)), { ,,,if (checkpointDirOption.isEmpty), { ,,,,,throw new SparkException ( ,,,,,,,”Cannot enable  receiver write-ahead log without checkpoint directory 设置只”,+ ,,,,,,,,,”Please use  streamingContext.checkpoint(),用set 从而checkpoint 目录只”,+ ,,,,,,,,,”阅读documentation for more 细节。”) ,,,} ,,,new WriteAheadLogBasedBlockHandler (env.blockManager, receiver.streamId, ,,,,,,,receiver.storageLevel env.conf,,, checkpointDirOption.get hadoopConf) ,,},{else ,,,new BlockManagerBasedBlockHandler (env.blockManager, receiver.storageLevel) ,,} }
WriteAheadLogBaseBlockHandler一方面将数据交由BlockManager管理,另一方面会写犯下日志。
一旦节点崩溃,可以由细胞膜日志恢复内存中的数据。在细胞膜开始时,就不在建议数据存储多个副本。
private val effectiveStorageLevel =, { if 才能;(storageLevel.deserialized), { ,,,logWarning(年代“Storage level  serialization $ {storageLevel.deserialized}, is not supported 当“,+ ,,,,,年代”,write ahead  log is 启用,,change 用serialization 错误的”) ,,} if 才能;(storageLevel.replication 祝辞,1),{ ,,,logWarning(年代“Storage level  replication $ {storageLevel.replication}, is unnecessary when ”, + ,,,,,年代“write ahead  log is 启用,,change 用replication 1”) ,,} StorageLevel才能(storageLevel.useDisk, storageLevel.useMemory,, storageLevel.useOffHeap,,假的,,1) }
而BlockManagerBaseBlockHandler直接将数据交由BlockManager管理。
如果不写细胞膜,当节点崩溃了一定会数据丢失吗?这个也不一定。因为在构建WriteAheadLogBaseBlockHandler,和BlockManagerBaseBlockHandler的时候会将接收机的storageLevel传入.storageLevel用来描述数据保存的地方(内存,磁盘)以及副本个数。
class StorageLevel 私人( ,,,private var _useDisk:,布尔, ,,,private var _useMemory:,布尔, ,,,private var _useOffHeap:,布尔, ,,,private var _deserialized:,布尔, ,,,private var _replication:, Int =, 1) extends 才能外部化
公有如下种类的StorageLevel:
val NONE =, new StorageLevel(假的,假的,还以为,假,假) val DISK_ONLY =, new StorageLevel(真的,,假的,,假的,,假) val DISK_ONLY_2 =, new StorageLevel(真的,,假的,,假的,,假的,,2) val MEMORY_ONLY =, new StorageLevel(假,,真的,,假的,,真的) val MEMORY_ONLY_2 =, new StorageLevel(假,,真的,,假的,,真的,,2) null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null第12课:火花流源码解读之Execu