火花结构化流处理机制之容错机制的示例分析

  介绍

这篇文章给大家分享的是有关火花结构化流处理机制之容错机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

<强>容错机制

端到端的有且仅有一次保证,是结构化流设计的关键目标之一。

结构化流设计了,结构化流源、汇等等,来跟踪确切的处理进度,并让其重启或重运行来处理任何故障

流源是类似卡夫卡的偏移量(补偿)来跟踪流的读取位置。执行引擎使用检查点(关卡)和预写日志(提前写日志)来记录每个执行其的偏移范围值

流水槽是设计用来保证处理的幂等性

这样,依靠可回放的数据源(流源)和处理幂等(流水槽),结构流来做到任何故障下的端到端的有且仅有一次保证

 val  lines =spark.readStream
  .format才能(“socket")
  .option才能(“host",,“localhost")
  .option才能(“port",, 9999)
  .load才能()//,Split 从而,lines  into 单词
  val  words =, lines.as [String] .flatMap (_.split (“,“))//,Generate  running  word 计数
  val  wordCounts =, words.groupBy (“value") .count () 

其中,火花是SparkSession,线是DataFrame, DataFrame就是数据集(行)。

<强>数据集

看看数据集的触发因子的代码实现,比如foreach操作:

 def  foreach (f: T =祝辞,单位):,Unit =, withNewRDDExecutionId  {
  
  ,,,rdd.foreach (f)
  
  ,,}
  
  
  
  ,private  def  withNewRDDExecutionId (U)(身体:=祝辞,U):, U =, {
  
  ,,,SQLExecution.withNewExecutionId (sparkSession, rddQueryExecution), {
  
  ,,,,,rddQueryExecution.executedPlan.foreach  {, plan =比;
  
  ,,,,,,,plan.resetMetrics ()
  
  ,,,,,}
  
  ,,,,,的身体
  
  ,,,}
  
  以前,,} 

接着看:

<>之前,def  withNewExecutionId [T] (      ,,,,,sparkSession:, sparkSession,      ,,,,,queryExecution:, queryExecution,      ,,,,,名字:,选择[String],=, None)(身体:=祝辞,T):, T =, {      ,,,val  sc =sparkSession.sparkContext      ,,,val  oldExecutionId =, sc.getLocalProperty (EXECUTION_ID_KEY)      ,,,val  executionId =SQLExecution.nextExecutionId      ,,,sc.setLocalProperty (EXECUTION_ID_KEY, executionId.toString)      ,,,executionIdToQueryExecution.put (executionId, queryExecution)      ,,,try  {,,,,      ,,,,,withSQLConfPropagated (sparkSession), {,,,,,,      ,,,,,,,try  {,,,,,,,,      ,,,,,,,,,的身体      ,,,,,,,},catch  {,,,,,,,,      ,,,,,,,},finally  {,,,,,,,,      ,,,,,,,}      ,,,,,}      ,,,},{finally       ,,,,,executionIdToQueryExecution.remove (executionId)      ,,,,,sc.setLocalProperty (EXECUTION_ID_KEY, oldExecutionId)      ,,,}      以前,,}

执行的真正代码就是queryExecution: queryExecution只

 @transient  private  lazy  val  rddQueryExecution:, QueryExecution =, {
  
  ,,,val  deserialized =, CatalystSerde.deserialize [T] (logicalPlan)
  
  ,,,sparkSession.sessionState.executePlan(反序列化)
  
  以前,,} 

看到了看到的了,是sessionState.executePlan执行logicalPlan而得到了QueryExecution

这里的sessionState.executePlan其实就是创建了一个QueryExecution对象,然后执行QueryExecution的executedPlan方法得到SparkPlan这个物理计划。怎么生成的呢?

 lazy  val  SparkPlan:, SparkPlan =, tracker.measurePhase (QueryPlanningTracker.PLANNING), {
  
  ,,,SparkSession.setActiveSession (sparkSession),,,
  
  ,,,planner.plan (ReturnAnswer (optimizedPlan.clone ())) . next ()
  
  以前,,} 

通过planner.plan方法生成。

规划师是SparkPlanner。在BaseSessionStateBuilder类中定义。

 protected  def 规划师:,SparkPlanner =, {
  
  ,,,new  SparkPlanner (session.sparkContext,相依,,experimentalMethods), {
  
  ,,,,,override  def  extraPlanningStrategies:, Seq[策略],=,,,,,,,super.extraPlanningStrategies  + + customPlanningStrategies
  
  ,,,}
  
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null

火花结构化流处理机制之容错机制的示例分析