上节课将到了接收机是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker、下面我们看看ReceiverTracker具体的功能及实现。
一、ReceiverTracker主要的功能:
- <李>
在执行器上启动接收器。
停止接收器。
李> <李>更新接收器接收数据的速率(也就是限流)
李> <李>不断的等待接收器的运行状态,只要接收器停止运行,就重新启动接收器。也就是接收机的容错功能。
李> <李>接受接收机的注册。
李> <李>借助ReceivedBlockTracker来管理接收器接收数据的元数据。
李> <李>汇报接收发送过来的错误信息
李>
ReceiverTracker管理了一个消息通讯体ReceiverTrackerEndpoint,用来与接收机或者ReceiverTracker 进行消息通信。
在ReceiverTracker开始的方法中,实例化了ReceiverTrackerEndpoint,并且在执行器上启动接收器:
/* *,Start 从而endpoint 以及receiver execution 线程只*/开始():def Unit =, synchronized { if 才能;(isTrackerStarted), { ,,,throw new SparkException (“ReceiverTracker already 开始”) ,,} if 才能;(! receiverInputStreams.isEmpty), { ,,,endpoint =, ssc.env.rpcEnv.setupEndpoint ( ,,,,,,:“ReceiverTracker new ReceiverTrackerEndpoint (ssc.env.rpcEnv)) ,,,if (! skipReceiverLaunch), launchReceivers () ,,,logInfo (“ReceiverTracker 开始”) ,,,trackerState =开始 ,,} }
启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将接收机封装成抽样以工作的方式提交给集群运行。
endpoint.send (StartAllReceivers(接收器))
这里的端点就是ReceiverTrackerEndpoint的引用。
接收机启动后,会向ReceiverTracker注册,注册成功才算正式启动了。
override protected def onReceiverStart ():, Boolean =, { val 才能;msg =, RegisterReceiver ( ,,,,,,streamId receiver.getClass.getSimpleName,,主机,executorId,端点) trackerEndpoint.askWithRetry才能(布尔)(味精) }
当接收端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker:
/* *,Store block 以及report it 用driver */def pushAndReportBlock ( ,,,receivedBlock:, receivedBlock, ,,,metadataOption:,选项(任何) ,,,blockIdOption:,选项(StreamBlockId) ),{才能 val 才能;blockId =, blockIdOption.getOrElse (nextBlockId) val 才能;time =System.currentTimeMillis val 才能;blockStoreResult =, receivedBlockHandler.storeBlock (blockId, receivedBlock) logDebug才能(s”Pushed block  blockId 美元;拷贝$ {(System.currentTimeMillis 安康;时间)},女士”) val 才能;numRecords =blockStoreResult.numRecords val 才能;blockInfo =, ReceivedBlockInfo (numRecords, streamId,还以为,metadataOption, blockStoreResult) trackerEndpoint.askWithRetry才能(布尔)(AddBlock (blockInfo)) logDebug才能(s”Reported block  blockId美元”) }
当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据:
case AddBlock (receivedBlockInfo),=比; if 才能;(WriteAheadLogUtils.isBatchingEnabled (ssc.conf, isDriver =, true)), { ,,,walBatchingThreadPool.execute (new Runnable  { ,,,,,override def 运行():,Unit =, Utils.tryLogNonFatalError { ,,,,,,,if (活动),{ ,,,,,,,,,context.reply (addBlock (receivedBlockInfo)) ,,,,,,,},{else ,,,,,,,,,throw new IllegalStateException (“ReceiverTracker RpcEndpoint shut 下来。”) ,,,,,,,} ,,,,,} ,,,}) ,,},{else ,,,context.reply (addBlock (receivedBlockInfo)) 以前,,}>数据的元数据是交由ReceivedBlockTracker管理的。
数据最终被写入到streamIdToUnallocatedBlockQueues中:一个流对应一个数据块信息的队列。
private type ReceivedBlockQueue =, mutable.Queue [ReceivedBlockInfo] private val streamIdToUnallocatedBlockQueues =, new mutable.HashMap [Int, ReceivedBlockQueue]第11课:火花流源码解读之司机中的ReceiverTracker架构设计以及具体实现彻底研究