第11课:火花流源码解读之司机中的ReceiverTracker架构设计以及具体实现彻底研究

  

上节课将到了接收机是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker、下面我们看看ReceiverTracker具体的功能及实现。

一、ReceiverTracker主要的功能:

<李>

在执行器上启动接收器。

<李>

停止接收器。

<李>

更新接收器接收数据的速率(也就是限流)

<李>

不断的等待接收器的运行状态,只要接收器停止运行,就重新启动接收器。也就是接收机的容错功能。

<李>

接受接收机的注册。

<李>

借助ReceivedBlockTracker来管理接收器接收数据的元数据。

<李>

汇报接收发送过来的错误信息


ReceiverTracker管理了一个消息通讯体ReceiverTrackerEndpoint,用来与接收机或者ReceiverTracker 进行消息通信。

在ReceiverTracker开始的方法中,实例化了ReceiverTrackerEndpoint,并且在执行器上启动接收器:

/* *,Start 从而endpoint 以及receiver  execution 线程只*/开始():def  Unit =, synchronized  {   if 才能;(isTrackerStarted), {   ,,,throw  new  SparkException (“ReceiverTracker  already 开始”)   ,,}      if 才能;(! receiverInputStreams.isEmpty), {   ,,,endpoint =, ssc.env.rpcEnv.setupEndpoint (   ,,,,,,:“ReceiverTracker new  ReceiverTrackerEndpoint (ssc.env.rpcEnv))   ,,,if  (! skipReceiverLaunch), launchReceivers ()   ,,,logInfo (“ReceiverTracker 开始”)   ,,,trackerState =开始   ,,}   }

启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将接收机封装成抽样以工作的方式提交给集群运行。

endpoint.send (StartAllReceivers(接收器))

这里的端点就是ReceiverTrackerEndpoint的引用。


接收机启动后,会向ReceiverTracker注册,注册成功才算正式启动了。

override  protected  def  onReceiverStart ():, Boolean =, {   val 才能;msg =, RegisterReceiver (   ,,,,,,streamId receiver.getClass.getSimpleName,,主机,executorId,端点)   trackerEndpoint.askWithRetry才能(布尔)(味精)   }

当接收端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker:

/* *,Store  block 以及report  it 用driver  */def  pushAndReportBlock (   ,,,receivedBlock:, receivedBlock,   ,,,metadataOption:,选项(任何)   ,,,blockIdOption:,选项(StreamBlockId)   ),{才能   val 才能;blockId =, blockIdOption.getOrElse (nextBlockId)   val 才能;time =System.currentTimeMillis   val 才能;blockStoreResult =, receivedBlockHandler.storeBlock (blockId, receivedBlock)   logDebug才能(s”Pushed  block  blockId 美元;拷贝$ {(System.currentTimeMillis 安康;时间)},女士”)   val 才能;numRecords =blockStoreResult.numRecords   val 才能;blockInfo =, ReceivedBlockInfo (numRecords, streamId,还以为,metadataOption, blockStoreResult)   trackerEndpoint.askWithRetry才能(布尔)(AddBlock (blockInfo))   logDebug才能(s”Reported  block  blockId美元”)   }


当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据:

case  AddBlock (receivedBlockInfo),=比;   if 才能;(WriteAheadLogUtils.isBatchingEnabled (ssc.conf, isDriver =, true)), {   ,,,walBatchingThreadPool.execute (new  Runnable  {   ,,,,,override  def 运行():,Unit =, Utils.tryLogNonFatalError  {   ,,,,,,,if (活动),{   ,,,,,,,,,context.reply (addBlock (receivedBlockInfo))   ,,,,,,,},{else    ,,,,,,,,,throw  new  IllegalStateException (“ReceiverTracker  RpcEndpoint  shut 下来。”)   ,,,,,,,}   ,,,,,}   ,,,})   ,,},{else    ,,,context.reply (addBlock (receivedBlockInfo))   以前,,}

数据的元数据是交由ReceivedBlockTracker管理的。

数据最终被写入到streamIdToUnallocatedBlockQueues中:一个流对应一个数据块信息的队列。

private  type  ReceivedBlockQueue =, mutable.Queue [ReceivedBlockInfo]      private  val  streamIdToUnallocatedBlockQueues =, new  mutable.HashMap [Int, ReceivedBlockQueue]

第11课:火花流源码解读之司机中的ReceiverTracker架构设计以及具体实现彻底研究