介绍
这篇文章给大家分享的是有关Spark2.x中如何实现SparkStreaming消费卡夫卡实例的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
软件软件:
,,,,,,,火花版本是apache spark2.2.0
,,,,,,卡夫卡版本是kafka0.10.0
,,,,采用直接的方法的方式来融合引发流和卡夫卡。没有采用Receiver-Based的方式。后续我会专门整理一篇文章分析两种融合方式不同。
1.卡夫卡数据准备:
创建卡夫卡的话题命令:
/usr/黄芪丹参滴丸/2.6.3.0-235/卡夫卡/bin/kafka-topics.sh ——zookeeper salver32.hadoop.unicom salver31.hadoop.unicom salver158.hadoop.unicom: 2181: 2181: 2181,主题kafkawordcount 复制因子2分区2——创造
发送数据命令:
/usr/黄芪丹参滴丸/2.6.3.0-235/卡夫卡/bin/kafka-console-producer。sh——zookeeper salver32.hadoop.unicom salver31.hadoop.unicom salver158.hadoop.unicom: 2181: 2181: 2181,主题kafkawordcount
2。代码实例:
<代码>包com.unicom.ljs.spark220.study.streaming; 代码> <代码>
代码> <代码>进口org.apache.kafka.clients.consumer.ConsumerRecord; 代码> <代码>进口org.apache.kafka.common.TopicPartition; 代码> <代码>进口org.apache.spark.SparkConf; 代码> <代码>进口org.apache.spark.streaming.Durations; 代码> <代码>进口org.apache.spark.streaming.api.java.JavaInputDStream; 代码> <代码>进口org.apache.spark.streaming.api.java.JavaPairDStream; 代码> <代码>进口org.apache.spark.streaming.api.java.JavaStreamingContext; 代码> <代码>进口org.apache.spark.streaming.kafka010.ConsumerStrategies; 代码> <代码>进口org.apache.spark.streaming.kafka010.KafkaUtils; 代码> <代码>进口org.apache.spark.streaming.kafka010.LocationStrategies; 代码> <代码>进口scala.Tuple2; 代码> <代码>
代码> <代码>进口java.util。*; 代码> <代码>
代码> <代码>/* * 代码> <代码> * @author:由lujisen 代码> <代码> * @company联通软件济南代码> <代码> * @date: 2020-01-31 20:30 代码> <代码> * @version: v1.0 代码> <代码> * @description: com.unicom.ljs.spark220.study。流代码> <代码> */代码> <代码>公共类KafkaStreamingWordCount{代码> <代码>,,公共静态void main (String [] args)抛出InterruptedException{代码> <代码>
代码> <代码>,,,,SparkConf SparkConf=new SparkConf () .setMaster(“本地[*]“).setAppName (“KafkaStreamingWordCount"); 代码> <代码>
代码> <代码>,,,,JavaStreamingContext ssc=new JavaStreamingContext (sparkConf Durations.seconds(5)); 代码> <代码>
代码> <代码>,,,字符串大敌;主题=発afkawordcount" 代码> <代码>
代码> <代码>,,,,Collection主题=new HashSet<在();代码> <代码>,,,,topics.add(主题),代码> <代码>
代码> <代码>,,,,//卡夫卡相关参数,其他参数可自行百度代码> <代码>,,,,字符串brokerList=?0.124.165.31:6667 10.124.165.32:6667"; 代码> <代码>,,,字符串,Map< Object>,道具=new HashMap<的在();代码> <代码>,,,,props.put (“bootstrap.servers" brokerList); 代码> <代码>,,,,props.put (“group.id",“groupLjs1"); 代码> <代码>,,,,props.put (“auto.offset.reset",“earliest"); 代码> <代码>,,,,props.put (“key.serializer",“org.apache.kafka.common.serialization.StringSerializer"); 代码> <代码>,,,,props.put (“key.deserializer",“org.apache.kafka.common.serialization.StringDeserializer"); 代码> <代码>,,,,props.put (“value.deserializer",“org.apache.kafka.common.serialization.StringDeserializer"); 代码> <代码>
代码> <代码>,,,,/*指定卡夫卡中主题的消费分区*/代码> <代码>,,,,Map,抵消=new HashMap<的在();代码> <代码>,,,偏移量。把(新TopicPartition(主题,0),0 l); 代码> <代码>,,,偏移量。把(新TopicPartition(主题,1),0 l); 代码> <代码>
代码> <代码>,,,,//通过KafkaUtils。createDirectStream指定卡夫卡数据源代码> <代码>,,,,//三个参数,1 sparkcontext locationstrategies 2.。PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区,3,订阅卡夫卡的配置代码> <代码>
代码> <代码>,,,,JavaInputDStream比;行=KafkaUtils。createDirectStream(<代码>/<代码>,,,,,,,代码,ssc, > <代码>,,,,,,,null null Spark2.x中如何实现SparkStreaming消费卡夫卡实例