介绍
这篇文章给大家分享的是有关火花结构化流处理机制之容错机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
<强>容错机制强>
端到端的有且仅有一次保证,是结构化流设计的关键目标之一。
结构化流设计了,结构化流源、汇等等,来跟踪确切的处理进度,并让其重启或重运行来处理任何故障
流源是类似卡夫卡的偏移量(补偿)来跟踪流的读取位置。执行引擎使用检查点(关卡)和预写日志(提前写日志)来记录每个执行其的偏移范围值
流水槽是设计用来保证处理的幂等性
这样,依靠可回放的数据源(流源)和处理幂等(流水槽),结构流来做到任何故障下的端到端的有且仅有一次保证
val lines =spark.readStream .format才能(“socket") .option才能(“host",,“localhost") .option才能(“port",, 9999) .load才能()//,Split 从而,lines into 单词 val words =, lines.as [String] .flatMap (_.split (“,“))//,Generate running  word 计数 val wordCounts =, words.groupBy (“value") .count ()
其中,火花是SparkSession,线是DataFrame, DataFrame就是数据集(行)。
<强>数据集强>
看看数据集的触发因子的代码实现,比如foreach操作:
def foreach (f: T =祝辞,单位):,Unit =, withNewRDDExecutionId { ,,,rdd.foreach (f) ,,} ,private def  withNewRDDExecutionId (U)(身体:=祝辞,U):, U =, { ,,,SQLExecution.withNewExecutionId (sparkSession, rddQueryExecution), { ,,,,,rddQueryExecution.executedPlan.foreach {, plan =比; ,,,,,,,plan.resetMetrics () ,,,,,} ,,,,,的身体 ,,,} 以前,,}>接着看:
<>之前,def withNewExecutionId [T] ( ,,,,,sparkSession:, sparkSession, ,,,,,queryExecution:, queryExecution, ,,,,,名字:,选择[String],=, None)(身体:=祝辞,T):, T =, { ,,,val sc =sparkSession.sparkContext ,,,val oldExecutionId =, sc.getLocalProperty (EXECUTION_ID_KEY) ,,,val executionId =SQLExecution.nextExecutionId ,,,sc.setLocalProperty (EXECUTION_ID_KEY, executionId.toString) ,,,executionIdToQueryExecution.put (executionId, queryExecution) ,,,try {,,,, ,,,,,withSQLConfPropagated (sparkSession), {,,,,,, ,,,,,,,try {,,,,,,,, ,,,,,,,,,的身体 ,,,,,,,},catch {,,,,,,,, ,,,,,,,},finally {,,,,,,,, ,,,,,,,} ,,,,,} ,,,},{finally ,,,,,executionIdToQueryExecution.remove (executionId) ,,,,,sc.setLocalProperty (EXECUTION_ID_KEY, oldExecutionId) ,,,} 以前,,}>执行的真正代码就是queryExecution: queryExecution只
@transient private lazy val rddQueryExecution:, QueryExecution =, { ,,,val deserialized =, CatalystSerde.deserialize [T] (logicalPlan) ,,,sparkSession.sessionState.executePlan(反序列化) 以前,,}>看到了看到的了,是sessionState.executePlan执行logicalPlan而得到了QueryExecution
这里的sessionState.executePlan其实就是创建了一个QueryExecution对象,然后执行QueryExecution的executedPlan方法得到SparkPlan这个物理计划。怎么生成的呢?
lazy val SparkPlan:, SparkPlan =, tracker.measurePhase (QueryPlanningTracker.PLANNING), { ,,,SparkSession.setActiveSession (sparkSession),,, ,,,planner.plan (ReturnAnswer (optimizedPlan.clone ())) . next () 以前,,}>通过planner.plan方法生成。
规划师是SparkPlanner。在BaseSessionStateBuilder类中定义。
protected def 规划师:,SparkPlanner =, { ,,,new SparkPlanner (session.sparkContext,相依,,experimentalMethods), { ,,,,,override def extraPlanningStrategies:, Seq[策略],=,,,,,,,super.extraPlanningStrategies + + customPlanningStrategies ,,,} null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null火花结构化流处理机制之容错机制的示例分析