Spark2.x中如何实现SparkStreaming消费卡夫卡实例

  介绍

这篇文章给大家分享的是有关Spark2.x中如何实现SparkStreaming消费卡夫卡实例的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

软件软件:

,,,,,,,火花版本是apache spark2.2.0

,,,,,,卡夫卡版本是kafka0.10.0

,,,,采用直接的方法的方式来融合引发流和卡夫卡。没有采用Receiver-Based的方式。后续我会专门整理一篇文章分析两种融合方式不同。

1.卡夫卡数据准备:

创建卡夫卡的话题命令:

/usr/黄芪丹参滴丸/2.6.3.0-235/卡夫卡/bin/kafka-topics.sh ——zookeeper salver32.hadoop.unicom salver31.hadoop.unicom salver158.hadoop.unicom: 2181: 2181: 2181,主题kafkawordcount 复制因子2分区2——创造

 Spark2.x中如何实现SparkStreaming消费卡夫卡实例

发送数据命令:

/usr/黄芪丹参滴丸/2.6.3.0-235/卡夫卡/bin/kafka-console-producer。sh——zookeeper salver32.hadoop.unicom salver31.hadoop.unicom salver158.hadoop.unicom: 2181: 2181: 2181,主题kafkawordcount

 Spark2.x中如何实现SparkStreaming消费卡夫卡实例

2。代码实例:

<节>
 <代码>包com.unicom.ljs.spark220.study.streaming;  <代码> 
<代码>进口org.apache.kafka.clients.consumer.ConsumerRecord; <代码>进口org.apache.kafka.common.TopicPartition; <代码>进口org.apache.spark.SparkConf; <代码>进口org.apache.spark.streaming.Durations; <代码>进口org.apache.spark.streaming.api.java.JavaInputDStream; <代码>进口org.apache.spark.streaming.api.java.JavaPairDStream; <代码>进口org.apache.spark.streaming.api.java.JavaStreamingContext; <代码>进口org.apache.spark.streaming.kafka010.ConsumerStrategies; <代码>进口org.apache.spark.streaming.kafka010.KafkaUtils; <代码>进口org.apache.spark.streaming.kafka010.LocationStrategies; <代码>进口scala.Tuple2; <代码>
<代码>进口java.util。*; <代码>
<代码>/* * <代码> * @author:由lujisen <代码> * @company联通软件济南 <代码> * @date: 2020-01-31 20:30 <代码> * @version: v1.0 <代码> * @description: com.unicom.ljs.spark220.study。流 <代码> */ <代码>公共类KafkaStreamingWordCount{ <代码>,,公共静态void main (String [] args)抛出InterruptedException{ <代码>
<代码>,,,,SparkConf SparkConf=new SparkConf () .setMaster(“本地[*]“).setAppName (“KafkaStreamingWordCount"); <代码>
<代码>,,,,JavaStreamingContext ssc=new JavaStreamingContext (sparkConf Durations.seconds(5)); <代码>
<代码>,,,字符串大敌;主题=発afkawordcount" <代码>
<代码>,,,,Collection主题=new HashSet<在(); <代码>,,,,topics.add(主题), <代码>
<代码>,,,,//卡夫卡相关参数,其他参数可自行百度 <代码>,,,,字符串brokerList=?0.124.165.31:6667 10.124.165.32:6667"; <代码>,,,字符串,Map< Object>,道具=new HashMap<的在(); <代码>,,,,props.put (“bootstrap.servers" brokerList); <代码>,,,,props.put (“group.id",“groupLjs1"); <代码>,,,,props.put (“auto.offset.reset",“earliest"); <代码>,,,,props.put (“key.serializer",“org.apache.kafka.common.serialization.StringSerializer"); <代码>,,,,props.put (“key.deserializer",“org.apache.kafka.common.serialization.StringDeserializer"); <代码>,,,,props.put (“value.deserializer",“org.apache.kafka.common.serialization.StringDeserializer"); <代码>
<代码>,,,,/*指定卡夫卡中主题的消费分区*/ <代码>,,,,Map,抵消=new HashMap<的在(); <代码>,,,偏移量。把(新TopicPartition(主题,0),0 l); <代码>,,,偏移量。把(新TopicPartition(主题,1),0 l); <代码>
<代码>,,,,//通过KafkaUtils。createDirectStream指定卡夫卡数据源 <代码>,,,,//三个参数,1 sparkcontext locationstrategies 2.。PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区,3,订阅卡夫卡的配置 <代码>
<代码>,,,,JavaInputDStream比;行=KafkaUtils。createDirectStream(<代码>/<代码>,,,,,,,代码,ssc, <代码>,,,,,,,null   null

Spark2.x中如何实现SparkStreaming消费卡夫卡实例