本期内容:
,,1,火花流元数据清理详解
,,2,火花流元数据清理源码解析
<强>一、如何研究火花流元数据清理强>
- <李>
操作DStream的时候会产生元数据,所以要解决抽样的数据清理工作就一定要从DStream入手。因为DStream是抽样的模板,DStream之间有依赖关系。
DStream的操作产生了抽样,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的。由此,DStream负责抽样的整个生命周期,因此研究的入口的是DStream。
基于卡夫卡数据来源,通过直接的方式访问卡夫卡,DStream随着时间的进行,会不断的在自己的内存数据结构中维护一个HashMap, HashMap维护的就是时间窗口,以及时间窗口下的抽样。按照批时间来存储抽样以及删除抽样。李李
> <>火花流本身是一直在运行的,在自己计算的时候会不断的产生抽样,例如每批秒持续时间都会产生抽样,除此之外可能还有累加器,广播变量。由于不断的产生这些对象,因此火花流有自己的一套对象,元数据以及数据的清理机制。
李> <李>火花流对抽样的管理就相当于JVM的GC
李>
<强>二、源码解析强>
火花流是通过我们设定的批处理时间来不断的产生抽样,火花流清理元数据跟时钟有关,因为数据是周期性的产生,所以肯定是周期性的释放,这些都跟JobGenerator有关,所以我们先从这开始研究。
1, RecurringTimer:消息循环器将消息不断的发送给EventLoop
=, RecurringTimer (……millisecondslongTime =祝辞,. post((时间(长期))))
2, EventLoop: onReceive接收到消息
():,=, synchronized { (!=,),=,EventLoop [JobGeneratorEvent] (), { (事件:JobGeneratorEvent):,=, processEvent(事件) 艾凡:,:,=,{ ,,,,,jobScheduler.reportError (e) ,,,} ,,} .start () (.),{ ,,,重启() },{才能 ,,,startFirstTime () ,,} }
3,在processEvent中接收清理元数据消息
/* *, Processes all events */private def  processEvent(事件:JobGeneratorEvent), { logDebug才能(“Got  event “, +,事件) event 才能;match  { case GenerateJobs(时间),=祝辞,GenerateJobs(时间) case ClearMetadata(时间),=祝辞,ClearMetadata(时间),//清理元数据 case DoCheckpoint(时间,clearCheckpointDataLater),=比; ,,,,,doCheckpoint(时间,clearCheckpointDataLater) case ClearCheckpointData(时间),=祝辞,ClearCheckpointData(时间),//清理检查站 ,,} }
具体的方法实现内容就不再这里说,我们进一步分析下这些清理动作是在什么时候被调用的,在火花流应用程序中,最终工作是交给JobHandler来执行的,所以我们分析下JobHandler
private class JobHandler(工作:工作),extends Runnable with Logging { import JobScheduler._ def run (), { try { val formattedTime =, UIUtils.formatBatchTime ( ,,,,,,,,,,,job.time.milliseconds ssc.graph.batchDuration.milliseconds,, showYYYYMMSS =, false) val batchUrl =, s"/流/批/? id=$ {job.time.milliseconds}“; val batchLinkText =, s" [output operation $ {job.outputOpId}, batch time $ {formattedTime}]“; ssc.sc.setJobDescription ( s"““Streaming job 得到& lt; https://www.yisu.com/zixun/a href=" $ batchUrl ">美元batchLinkText >”“”) ssc.sc。setLocalProperty (BATCH_TIME_PROPERTY_KEY job.time.milliseconds.toString) ssc.sc。setLocalProperty (OUTPUT_OP_ID_PROPERTY_KEY job.outputOpId.toString)//我们需要“eventLoop”分配给一个临时变量。否则,因为//癑obScheduler.stop (false)”可能“eventLoop”设置为null时这个方法运行时,//这是可能的,当“发布”,“eventLoop”零。 var _eventLoop=eventLoop 如果(_eventLoop !=null) { _eventLoop。帖子(JobStarted(工作,clock.getTimeMillis ()))//禁用检查现有流媒体发起的工作输出目录//调度器,因为我们可能需要编写输出到一个现有的目录在检查站//恢复;看到火花- 4835为更多的细节。 PairRDDFunctions.disableOutputSpecValidation.withValue(真正的){ job.run () } _eventLoop=eventLoop 如果(_eventLoop !=null) { _eventLoop。帖子(JobCompleted(工作,clock.getTimeMillis ())) } 其他}{//JobScheduler已经停止了。 } 最后}{ ssc.sc.setLocalProperty (JobScheduler。BATCH_TIME_PROPERTY_KEY零) ssc.sc.setLocalProperty (JobScheduler。OUTPUT_OP_ID_PROPERTY_KEY零) } } } }(版本定制)第16课:火花流源码解读之数据清理内幕彻底解密