胡本期内容:
,,1、卡夫卡解密
<强>背景:强>,
目前没有接收器在企业中使用的越来越多,没有接收器具有更强的控制度,语义一致性。接收器是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是抽样类型的。
所以火花流就产生了自定义抽样→KafkaRDD。
源码分析:
1, KafkaRDD源码
<>以前私人(卡夫卡) class KafkaRDD [ 凯西:,ClassTag, V: ClassTag, U & lt;:解码器[_]:,ClassTag, T & lt;:解码器[_]:,ClassTag, R: ClassTag],私人(火花),( ,,,sc:, SparkContext, kafkaParams:地图(字符串,字符串), val offsetRanges:数组(OffsetRange),,//指定数据范围 领导:地图(TopicAndPartition,(字符串,Int)], messageHandler:, MessageAndMetadata (K, V),=在R ),extends 抽样[R] (sc,, Nil), with Logging with HasOffsetRanges { override def  getPartitions:数组(分区),=,{ ,,,offsetRanges.zipWithIndex.map {, case (我)啊,,,=比; val (主机,端口),=,领导人(TopicAndPartition (o.topic, o.partition)) new KafkaRDDPartition (o.topic,我还以为,o.partition, o.fromOffset,, o.untilOffset,,主机,端口) ,,}.toArray 以前,,}>
2, HasOffsetRanges
/* * ,* Represents any  object that has a collection of [[OffsetRange]]年代只却;能够还要be used 用access ,* offset ranges 拷贝RDDs generated by 从而direct Kafka DStream (见 ,* [[KafkaUtils.createDirectStream ()]])。 ,* {{{ *,,,KafkaUtils.createDirectStream (…) .foreachRDD {, rdd =比; ,*,,,,,val offsetRanges =, rdd.asInstanceOf HasOffsetRanges .offsetRanges ,*,,,,,… ,*,,} ,*}}} */trait HasOffsetRanges  { def offsetRanges:数组(OffsetRange) }
3, KafkaRDD中的计算
override def 计算(thePart:分区,,背景:,TaskContext):,迭代器[R],=, { val part =, thePart.asInstanceOf [KafkaRDDPartition] 断言(part.fromOffset & lt;=, part.untilOffset,, errBeginAfterEnd(部分) if (part.fromOffset ==, part.untilOffset), { ,,,log.info (s" Beginning  offset $ {part.fromOffset}, is 从而same as ending offset “, + s" skipping $ {part.topic}, {part.partition}“美元) Iterator.empty },{else new KafkaRDDIterator(部分,上下文) ,,} }
SparkStreaming一般使用KafkaUtils的createDirectStream读取数据
def createDirectStream [ 凯西:,ClassTag, V: ClassTag, KD & lt;:解码器[K]:, ClassTag, VD & lt;:解码器[V]:, ClassTag], ( ,,,ssc:, StreamingContext, kafkaParams:地图(字符串,字符串), 主题:设置(字符串) ):,InputDStream [(K, V)],=, { val messageHandler =,(多:MessageAndMetadata [K, V]),=祝辞,(mmd.key, mmd.message) val kc =, new KafkaCluster (kafkaParams) val fromOffsets =, getFromOffsets (kc, kafkaParams,,主题) new DirectKafkaInputDStream [V, K,还以为,KD, VD,, (K, V)] ( ,,,,ssc, kafkaParams, messageHandler fromOffsets,,) }
4,通过getFromOffsets的方法获取主题的fromOffset值
(卡夫卡),( ,,,kc:, KafkaClusterkafkaParams:,[]主题:,[] ,,):[TopicAndPartition],=, { 时间=reset kafkaParams.get () . map (_.toLowerCase) result =, { ,,,topicPartitions & lt;作用;kc.getPartitions(主题)对不对 ,,,leaderOffsets & lt;作用;((reset ==, ()), { ,,,,,kc.getEarliestLeaderOffsets (topicPartitions) ,,,},{ ,,,,,kc.getLatestLeaderOffsets (topicPartitions) ,,})对不对 },{才能 ,,,leaderOffsets.map {, (tplo)=比; ,,,,,,,(tplo.offset) ,,,} ,,} ,KafkaCluster。(结果) }
createDirectStream其实生成的是DirectKafkaInputDStream对象,通过计算方法会产生KafkaRDD
(validTime:时间):,选择[KafkaRDD []],=, { untilOffsets =,夹(latestLeaderOffsets ()) 时间=rdd [] ( ,,,context.sparkContextkafkaParamsuntilOffsetsmessageHandler) 时间=offsetRanges .map {, (tpfo)=比; null null null null null null null null null null null null null null null null null(版本定制)第15课:火花流源码解读之没有接收器彻底思考