对象H延伸应用{ val相依=new SparkConf () .setMaster .setAppName(“地方[2]”)(“hello”) val党卫军=new StreamingContext(参看,秒(5)) val kafkaParams=Map [String, String] (“metadata.broker.list”->“myhadoop1:9092”) ss.checkpoint (“hdfs://myhadoop1:8020/数据”) val主题=[String] (“wordcount1”)//卡夫卡 val行=KafkaUtils.createDirectStream[字符串,字符串,StringDecoder, StringDecoder] (ss、kafkaParams主题) lines.flatMap (_._2.split (" ")) . map ((_, - 1)) .updateStateByKey ((Seq: Seq [Int],选择:选项(Int))=> { var oldValue=https://www.yisu.com/zixun/option.getOrElse (0) (seq <-seqs) { oldValue +=seq } 选项(Int) (oldValue) }).print () ss.start () ss.awaitTermination () }