火花读卡夫卡取数据流

火花读卡夫卡取数据流提供了两种方式createDstream和createDirectStream。

两者区别如下:

1, KafkaUtils。createDstream

构造函数为KafkaUtils。createDstream (ssc, [zk],[消费者组id],[每个主题,分区]),
使用了接收器来接收数据,利用的是卡夫卡高层次的消费者api,对于所有的接收器接收到的数据将会保存在Spark 执行器中,然后通过火花流启动工作来处理这些数据,默认会丢失,可启用细胞膜日志,该日志存储在HDFS上,
,创建一个接收机来对卡夫卡进行定时拉取数据,ssc的抽样分区和卡夫卡的主题分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个接收机中消费主题的线程数,并不增加火花的并行处理数据数量,
B,对于不同的集团和主题可以使用多个接收器创建不同的DStream 
C,如果启用了细胞膜,需要设置存储级别,即KafkaUtils.createStream (…StorageLevel.MEMORY_AND_DISK_SER)

2. kafkautils。createDirectStream

区别接收器接收数据,这种方式定期地从卡夫卡的主题+分区中查询最新的偏移量,再根据偏移量范围在每个批次里面处理数据,使用的是卡夫卡的简单消费者api 
优点:,
,简化并行,不需要多个卡夫卡输入流,该方法将会创建和卡夫卡分区一样的抽样个数,而且会从卡夫卡并行读取只
B,高效,这种方式并不需要细胞膜,细胞膜模式需要对数据复制两次,第一次是被卡夫卡复制,另一次是写到细胞膜中,
C,恰好一次语义(Exactly-once-semantics),传统的读卡夫卡取数据是通过卡夫卡高层次api把偏移量写入动物园管理员中,存在数据丢失的可能性是动物园管理员中和ssc的偏移量不一致.EOS通过实现卡夫卡低层次api,偏移量仅仅被ssc保存在关卡中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于管理员的卡夫卡监控工具


公共空间adclick () {

SparkConf相依=new SparkConf ()

.setAppName (" ")

.setMaster (" "),

JavaStreamingContext jssc=new JavaStreamingContext(参看,Durations.seconds (10)),

jssc.checkpoint (" "),

Map<字符串,String>kafkaParams=new HashMap<字符串,String> ();

kafkaParams.put (“metadata.broker。名单”,ConfigurationManager.getProperty (“metadata.broker.list”));

字符串kafkaTopics=ConfigurationManager.getProperty (“kafkaTopics”);

String [] kafkaTopicsSplits=kafkaTopics.split (", ");

Set上衣=new HashSet ();

(字符串xx: kafkaTopicsSplits) {

tops.add (xx);

}

JavaPairInputDStream<字符串,String>adRealTimeDStream=KafkaUtils。

createDirectStream (

jssc,,

String.class,,

String.class,,

StringDecoder.class,,

StringDecoder.class,,

kafkaParams,,

顶部);

jssc.start ();

jssc.awaitTermination ();

jssc.close ();

}


火花读卡夫卡取数据流