流与卡夫卡updateStateBykey ()

  
对象H延伸应用{
  val相依=new SparkConf () .setMaster .setAppName(“地方[2]”)(“hello”)
  val党卫军=new StreamingContext(参看,秒(5))
  val kafkaParams=Map [String, String] (“metadata.broker.list”->“myhadoop1:9092”)
  ss.checkpoint (“hdfs://myhadoop1:8020/数据”)
  val主题=[String] (“wordcount1”)//卡夫卡
  val行=KafkaUtils.createDirectStream[字符串,字符串,StringDecoder, StringDecoder] (ss、kafkaParams主题)
  lines.flatMap (_._2.split (" ")) . map ((_, - 1)) .updateStateByKey ((Seq: Seq [Int],选择:选项(Int))=> {
  var oldValue=https://www.yisu.com/zixun/option.getOrElse (0)
  (seq <-seqs) {
  oldValue +=seq
  }
  选项(Int) (oldValue)
  }).print ()
  ss.start ()
  ss.awaitTermination ()
  }


流与卡夫卡updateStateBykey ()