(版本定制)第16课:火花流源码解读之数据清理内幕彻底解密

  

本期内容:

,,1,火花流元数据清理详解

,,2,火花流元数据清理源码解析


<强>一、如何研究火花流元数据清理

<李>

操作DStream的时候会产生元数据,所以要解决抽样的数据清理工作就一定要从DStream入手。因为DStream是抽样的模板,DStream之间有依赖关系。
DStream的操作产生了抽样,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的。由此,DStream负责抽样的整个生命周期,因此研究的入口的是DStream。

<李>

基于卡夫卡数据来源,通过直接的方式访问卡夫卡,DStream随着时间的进行,会不断的在自己的内存数据结构中维护一个HashMap, HashMap维护的就是时间窗口,以及时间窗口下的抽样。按照批时间来存储抽样以及删除抽样。李李

<>

火花流本身是一直在运行的,在自己计算的时候会不断的产生抽样,例如每批秒持续时间都会产生抽样,除此之外可能还有累加器,广播变量。由于不断的产生这些对象,因此火花流有自己的一套对象,元数据以及数据的清理机制。

<李>

火花流对抽样的管理就相当于JVM的GC


<强>二、源码解析

火花流是通过我们设定的批处理时间来不断的产生抽样,火花流清理元数据跟时钟有关,因为数据是周期性的产生,所以肯定是周期性的释放,这些都跟JobGenerator有关,所以我们先从这开始研究。


1, RecurringTimer:消息循环器将消息不断的发送给EventLoop

=, RecurringTimer (……millisecondslongTime =祝辞,. post((时间(长期))))

2, EventLoop: onReceive接收到消息


 ():,=, synchronized  {
  (!=,),=,EventLoop [JobGeneratorEvent] (), {
  (事件:JobGeneratorEvent):,=, processEvent(事件)
  
  艾凡:,:,=,{
  ,,,,,jobScheduler.reportError (e)
  ,,,}
  ,,}
  .start ()
  
  (.),{
  ,,,重启()
  },{才能
  ,,,startFirstTime ()
  ,,}
  }

3,在processEvent中接收清理元数据消息


/* *, Processes  all  events  */private  def  processEvent(事件:JobGeneratorEvent), {
  logDebug才能(“Got  event “, +,事件)
  event 才能;match  {
  case  GenerateJobs(时间),=祝辞,GenerateJobs(时间)
  case  ClearMetadata(时间),=祝辞,ClearMetadata(时间),//清理元数据
  case  DoCheckpoint(时间,clearCheckpointDataLater),=比;
  ,,,,,doCheckpoint(时间,clearCheckpointDataLater)
  case  ClearCheckpointData(时间),=祝辞,ClearCheckpointData(时间),//清理检查站
  ,,}
  }

具体的方法实现内容就不再这里说,我们进一步分析下这些清理动作是在什么时候被调用的,在火花流应用程序中,最终工作是交给JobHandler来执行的,所以我们分析下JobHandler


 private  class  JobHandler(工作:工作),extends  Runnable  with  Logging  {
  import  JobScheduler._
  
  def  run (), {
  try  {
  val  formattedTime =, UIUtils.formatBatchTime (
  ,,,,,,,,,,,job.time.milliseconds ssc.graph.batchDuration.milliseconds,, showYYYYMMSS =, false)
  val  batchUrl =, s"/流/批/? id=$ {job.time.milliseconds}“;
  val  batchLinkText =, s" [output  operation  $ {job.outputOpId}, batch  time  $ {formattedTime}]“;
  
  ssc.sc.setJobDescription (
  s"““Streaming  job 得到& lt; https://www.yisu.com/zixun/a  href=" $ batchUrl ">美元batchLinkText ”“”)
  ssc.sc。setLocalProperty (BATCH_TIME_PROPERTY_KEY job.time.milliseconds.toString)
  ssc.sc。setLocalProperty (OUTPUT_OP_ID_PROPERTY_KEY job.outputOpId.toString)//我们需要“eventLoop”分配给一个临时变量。否则,因为//癑obScheduler.stop (false)”可能“eventLoop”设置为null时这个方法运行时,//这是可能的,当“发布”,“eventLoop”零。
  var _eventLoop=eventLoop
  如果(_eventLoop !=null) {
  _eventLoop。帖子(JobStarted(工作,clock.getTimeMillis ()))//禁用检查现有流媒体发起的工作输出目录//调度器,因为我们可能需要编写输出到一个现有的目录在检查站//恢复;看到火花- 4835为更多的细节。
  PairRDDFunctions.disableOutputSpecValidation.withValue(真正的){
  job.run ()
  }
  _eventLoop=eventLoop
  如果(_eventLoop !=null) {
  _eventLoop。帖子(JobCompleted(工作,clock.getTimeMillis ()))
  }
  其他}{//JobScheduler已经停止了。
  }
  最后}{
  ssc.sc.setLocalProperty (JobScheduler。BATCH_TIME_PROPERTY_KEY零)
  ssc.sc.setLocalProperty (JobScheduler。OUTPUT_OP_ID_PROPERTY_KEY零)
  }
  }
  }
  }

(版本定制)第16课:火花流源码解读之数据清理内幕彻底解密