flink怎么使用Event_time处理实时数据

  介绍

本篇内容主要讲解“flink怎么使用Event_time处理实时数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink怎么使用Event_time处理实时数据”吧!

//flink中关于时间的三个概念//event 时间:数据产生的时间//processing 时间:处理数据的时间,即操作数据的之间,比如一个flink在scala中地图的函数处理数据时//ingest 时间:摄取数据时间,在一个流程序中,一个时间段收集数据的时间//而evet 时间在处理实时数据时是比较有用的,例如在由于网络的繁忙的原因,某些数据未能按时到达,假设迟到了30分钟,//而我们定义的最大延迟不能超过十分钟,那么一些数据包含了超时的数据那么这些数据是不会在这次操作中处理的而是会//丢弃掉以前 <>前//卡夫卡生产者代码
  package  kafka.partition.test;
  import  java.util.HashMap;
  import  java.util.Map;
  import  org.apache.kafka.clients.producer.KafkaProducer;
  import  org.apache.kafka.clients.producer.ProducerRecord;
  public  class  PartitionProducer  {
  public  static  void  main (String [], args), {
  
  字符串,Object> Map, producer =, new  KafkaProducer<祝辞(道具);
  (int 小姐:=,0,,,小姐:& lt;=, 20,我+ +),{//flink的watermarkassginer里面定义的超时时间是5000毫秒
  long  mills =, System.currentTimeMillis ();
  如果(我% 3==0),{//数据的event 时间放在字符串的开头,以空格分割//kafka  event_time 主题的0分区超时4000毫秒
  String  line =,(米尔斯- 4000)+“,“+“分区0——却;能够is  a  big  +“, +我;
  ProducerRecord<,字符串,String>, record =, new  ProducerRecord<字符串,String>(话题,,new 整数(0),空,,i +“,,,行);
  producer.send(记录);
  }else 如果(我% 3==1),{//kafka  event_time 主题的1分区超时5000毫秒
  String  line =,(米尔斯- 5000)+“,“+“分区1——却;能够is  a  big  +“, +我;
  ProducerRecord<,字符串,String>, record =, new  ProducerRecord<字符串,String>(话题,,new 整数(1),空,,i +“,,,行);
  producer.send(记录);
  }else 如果(我% 3==2),{//kafka  event_time 主题的2分区超时8000毫秒
  String  line =,(米尔斯- 8000)+“,“+“分区2——却;能够is  a  big  +“, +我;
  ProducerRecord<,字符串,String>, record =, new  ProducerRecord<字符串,String>(话题,,new 整数(2),空,,i +“,,,行);
  producer.send(记录);
  }
  }
  
  producer.close ();
  }
  }
//自定义的TimestampsAndWatermarks
  package  flink.streaming
  import  org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
  import  org.apache.flink.streaming.api.watermark.Watermark
  class  CustomWaterMarks  extends  AssignerWithPeriodicWatermarks [String] {
  ,,,//超时时间
  val 才能;maxOutOrderness =5000 l//flink才能过一段时间便会调一次该函数获取水印
  def 才能getCurrentWatermark (): Watermark ={
  ,,,val , sysMilssecons =,, System.currentTimeMillis ()
  ,,,,new 水印(sysMilssecons-maxOutOrderness),
  ,,,,
  ,,}//才能每条记录多会调用,来获得even  time 在生产的数据中,even_time放在字符串的第一个字段,用空格分割
  def 才能;extractTimestamp(元素:字符串,previousElementTimestamp:长):,Long =, {
  ,,((element.split (“,“)) .head) .toLong
  ,,}
  }
<>以前package  flink.streaming   import  java.util.Properties   import  org.apache.flink.streaming.api.scala._   import  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer   import  org.apache.flink.api.common.serialization.SimpleStringSchema   import  org.apache.flink.streaming.api.windowing.time.Time   import  org.apache.flink.streaming.api.TimeCharacteristic   import  org.apache.flink.streaming.api.functions.sink.RichSinkFunction   import  org.apache.flink.streaming.api.CheckpointingMode   import  org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks   object  StreamWithEventTimeAndWaterMarks  {   ,,   def 才能;主要(args:数组[String]):, Unit =, {   ,,,val  kafkaProps =, new 属性()   ,,,//卡夫卡的一些属性   ,,,kafkaProps.setProperty (“bootstrap.servers",,“bigdata01:9092")   ,,,//所在的消费组   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null   null

flink怎么使用Event_time处理实时数据