介绍
本篇内容主要讲解“flink怎么使用Event_time处理实时数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink怎么使用Event_time处理实时数据”吧!
//flink中关于时间的三个概念//event 时间:数据产生的时间//processing 时间:处理数据的时间,即操作数据的之间,比如一个flink在scala中地图的函数处理数据时//ingest 时间:摄取数据时间,在一个流程序中,一个时间段收集数据的时间//而evet 时间在处理实时数据时是比较有用的,例如在由于网络的繁忙的原因,某些数据未能按时到达,假设迟到了30分钟,//而我们定义的最大延迟不能超过十分钟,那么一些数据包含了超时的数据那么这些数据是不会在这次操作中处理的而是会//丢弃掉以前> <>前//卡夫卡生产者代码 package kafka.partition.test; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class  PartitionProducer { public static  void main (String [], args), { 字符串,Object> Map, producer =, new KafkaProducer<祝辞(道具); (int 小姐:=,0,,,小姐:& lt;=, 20,我+ +),{//flink的watermarkassginer里面定义的超时时间是5000毫秒 long mills =, System.currentTimeMillis (); 如果(我% 3==0),{//数据的event 时间放在字符串的开头,以空格分割//kafka event_time 主题的0分区超时4000毫秒 String line =,(米尔斯- 4000)+“,“+“分区0——却;能够is a big +“, +我; ProducerRecord<,字符串,String>, record =, new ProducerRecord<字符串,String>(话题,,new 整数(0),空,,i +“,,,行); producer.send(记录); }else 如果(我% 3==1),{//kafka event_time 主题的1分区超时5000毫秒 String line =,(米尔斯- 5000)+“,“+“分区1——却;能够is a big +“, +我; ProducerRecord<,字符串,String>, record =, new ProducerRecord<字符串,String>(话题,,new 整数(1),空,,i +“,,,行); producer.send(记录); }else 如果(我% 3==2),{//kafka event_time 主题的2分区超时8000毫秒 String line =,(米尔斯- 8000)+“,“+“分区2——却;能够is a big +“, +我; ProducerRecord<,字符串,String>, record =, new ProducerRecord<字符串,String>(话题,,new 整数(2),空,,i +“,,,行); producer.send(记录); } } producer.close (); } }
//自定义的TimestampsAndWatermarks package flink.streaming import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.watermark.Watermark class CustomWaterMarks  extends AssignerWithPeriodicWatermarks [String] { ,,,//超时时间 val 才能;maxOutOrderness =5000 l//flink才能过一段时间便会调一次该函数获取水印 def 才能getCurrentWatermark (): Watermark ={ ,,,val , sysMilssecons =,, System.currentTimeMillis () ,,,,new 水印(sysMilssecons-maxOutOrderness), ,,,, ,,}//才能每条记录多会调用,来获得even time 在生产的数据中,even_time放在字符串的第一个字段,用空格分割 def 才能;extractTimestamp(元素:字符串,previousElementTimestamp:长):,Long =, { ,,((element.split (“,“)) .head) .toLong ,,} }<>以前package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks object StreamWithEventTimeAndWaterMarks  { ,, def 才能;主要(args:数组[String]):, Unit =, { ,,,val kafkaProps =, new 属性() ,,,//卡夫卡的一些属性 ,,,kafkaProps.setProperty (“bootstrap.servers",,“bigdata01:9092") ,,,//所在的消费组 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null