(版本定制)第15课:火花流源码解读之没有接收器彻底思考

  

胡本期内容:

,,1、卡夫卡解密


<强>背景:,
目前没有接收器在企业中使用的越来越多,没有接收器具有更强的控制度,语义一致性。接收器是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是抽样类型的。

所以火花流就产生了自定义抽样→KafkaRDD。


源码分析:

1, KafkaRDD源码

<>以前私人(卡夫卡)   class  KafkaRDD [   凯西:,ClassTag,   V: ClassTag,   U  & lt;:解码器[_]:,ClassTag,   T  & lt;:解码器[_]:,ClassTag,   R: ClassTag],私人(火花),(   ,,,sc:, SparkContext,   kafkaParams:地图(字符串,字符串),   val  offsetRanges:数组(OffsetRange),,//指定数据范围   领导:地图(TopicAndPartition,(字符串,Int)],   messageHandler:, MessageAndMetadata (K, V),=在R   ),extends 抽样[R] (sc,, Nil), with  Logging  with  HasOffsetRanges  {   override  def  getPartitions:数组(分区),=,{   ,,,offsetRanges.zipWithIndex.map  {, case (我)啊,,,=比;   val (主机,端口),=,领导人(TopicAndPartition (o.topic, o.partition))   new  KafkaRDDPartition (o.topic,我还以为,o.partition, o.fromOffset,, o.untilOffset,,主机,端口)   ,,}.toArray   以前,,}

2, HasOffsetRanges


 
/* *
  ,* Represents  any  object  that  has  a  collection  of  [[OffsetRange]]年代只却;能够还要be  used 用access 
  ,* offset  ranges 拷贝RDDs  generated  by 从而direct  Kafka  DStream (见
  ,* [[KafkaUtils.createDirectStream ()]])。
  ,* {{{
  *,,,KafkaUtils.createDirectStream (…) .foreachRDD  {, rdd =比;
  ,*,,,,,val  offsetRanges =, rdd.asInstanceOf HasOffsetRanges .offsetRanges
  ,*,,,,,…
  ,*,,}
  ,*}}}
  */trait  HasOffsetRanges  {
  def  offsetRanges:数组(OffsetRange)
  }

3, KafkaRDD中的计算


 override  def 计算(thePart:分区,,背景:,TaskContext):,迭代器[R],=, {
  val  part =, thePart.asInstanceOf [KafkaRDDPartition]
  断言(part.fromOffset  & lt;=, part.untilOffset,, errBeginAfterEnd(部分)
  if  (part.fromOffset ==, part.untilOffset), {
  ,,,log.info (s" Beginning  offset  $ {part.fromOffset}, is 从而same  as  ending  offset “, +
  s" skipping  $ {part.topic}, {part.partition}“美元)
  Iterator.empty
  },{else 
  new  KafkaRDDIterator(部分,上下文)
  ,,}
  }

SparkStreaming一般使用KafkaUtils的createDirectStream读取数据


 def  createDirectStream [
  凯西:,ClassTag,
  V: ClassTag,
  KD  & lt;:解码器[K]:, ClassTag,
  VD  & lt;:解码器[V]:, ClassTag], (
  ,,,ssc:, StreamingContext,
  kafkaParams:地图(字符串,字符串),
  主题:设置(字符串)
  ):,InputDStream [(K, V)],=, {
  val  messageHandler =,(多:MessageAndMetadata [K, V]),=祝辞,(mmd.key, mmd.message)
  val  kc =, new  KafkaCluster (kafkaParams)
  val  fromOffsets =, getFromOffsets (kc, kafkaParams,,主题)
  new  DirectKafkaInputDStream [V, K,还以为,KD, VD,, (K, V)] (
  ,,,,ssc, kafkaParams, messageHandler fromOffsets,,)
  }

4,通过getFromOffsets的方法获取主题的fromOffset值


(卡夫卡),(
  ,,,kc:, KafkaClusterkafkaParams:,[]主题:,[]
  ,,):[TopicAndPartition],=, {
  时间=reset  kafkaParams.get () . map (_.toLowerCase)
  result =, {
  ,,,topicPartitions  & lt;作用;kc.getPartitions(主题)对不对
  ,,,leaderOffsets  & lt;作用;((reset ==, ()), {
  ,,,,,kc.getEarliestLeaderOffsets (topicPartitions)
  ,,,},{
  ,,,,,kc.getLatestLeaderOffsets (topicPartitions)
  ,,})对不对
  },{才能
  ,,,leaderOffsets.map  {, (tplo)=比;
  ,,,,,,,(tplo.offset)
  ,,,}
  ,,}
  ,KafkaCluster。(结果)
  }

createDirectStream其实生成的是DirectKafkaInputDStream对象,通过计算方法会产生KafkaRDD


 (validTime:时间):,选择[KafkaRDD []],=, {
  untilOffsets =,夹(latestLeaderOffsets ())
  时间=rdd  [] (
  ,,,context.sparkContextkafkaParamsuntilOffsetsmessageHandler)
  
  时间=offsetRanges  .map  {, (tpfo)=比;
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null
  null

(版本定制)第15课:火花流源码解读之没有接收器彻底思考