FLINK神功插件学习笔记

  

神功是一款功能强大的CEP引擎,具有自己的DSL,丰富的模式匹配功能和可扩展性,感谢陈浩同学提供了神功和FLINK的整合功能https://github.com/haoch/flink-siddhi本文主要介绍了这个插件的一些实现思路

  
      <李> <强>将FLINK转流化为神功流定义   
  

? ?用法:SiddhiCEP。registerStream (streamName FlinkDataStream,字段名)

  

? ?通过FlinkDataStream。方法获得流对象的类型定义。registerStream方法会构造一个SiddhiStreamSchema对象,根据流对象的类型定义,自动获取每个领域对应的数据类型存在内部的fieldTypes数组中。

  

? ?SiddhiStreamSchema内部会创建一个神功StreamDefinition对象,StreamDefinition的属性的定义根据字段名+ fieldTypes来添加.SiddhiTypeFactory。getAttributeType负责将Flink的数据类型映射为神功的数据类型,并可自动生成一段定义流的定义(见SiddhiStreamSchema。getStreamDefinitionExpression方法)定义流[streamName] ([fieldName 1] [fieldType 1],……(字段名n] [fieldType n])

  

? ?SiddhiStreamSchema包括一个StreamSerializer:用于将数据流中的对象转化为神功流的输入(对象数组):
? ? ? ?如果流对象是一个简单类型原子类型直接将流对象放到数组中
? ? ? ?如果流对象是Tuple类型,直接将元组中前N个值放到数组中
? ? ? ?如果流对象是Pojo或者CaseClass类型,直接根据每个字段名获取类对应的属性放到数组中

     <李> <>强串联FLINK流和神功流      

? ?SiddhiStream:抽象的流基类

  

? ?convertDataStream将原始的FLINK流转化为Tuple类型的流,元组的第一个元素为StreamId,第二个元素为原来流中的数据,支持普通流和KeyedStream

  

? ?ExecutionSiddhiStream:构建SiddhiOperatorContext并调用SiddhiStreamFactory。createDataStream创建了集成神功的数据流。数据流的类型为元组的子类.SiddhiTypeFactory。getTupleTypeInformation:其核心思路是通过神功输出流的StreamDefinition获得其属性的定义,再通过TypeInfoParser。解析构造Flink Tuple类型的定义

  

? ?ExecutableStream根据神功查询创建ExecutionSiddhiStream对象
? ?SingleSiddhiStream UnionSiddhiStream: ExecutableStream的子类,支持流畅风格的链式调用。UnionSiddhiStream调用了数据流。联盟方法

  

? ? SiddhiStreamFactory。createDataStream通过FLINK DataStream数据的变换方法使用了自定义的StreamOperator: SiddhiStreamOperator。在AbstractSiddhiOperator的设置方法中创建SiddhiManager和SiddhiAppRuntime并注册了InputHandler和OutputCallback (StreamOutputHandler)

  

? ? SiddhiStreamOperator。processElement需要处理两种场景:
? ? ? ?Flink TimeCharacteristic=ProcessingTime:先调用StreamSerializer将数据转化为对象数组,再直接调用InputHandler。发送将数据发送给神功处理
? ? ? ?Flink TimeCharacteristic=EventTime:缓存接收到的StreamRecord到内部的priorityQueue中,直到收到水印,将priorityQueue中小于水印的StreamRecord一次发送给神功处理

  

  &ensp StreamOutputHandler:根据输出的TypeInfo将神功事件转化为Flink StreamRecord。再转发到SiddhiStreamOperator的输出

     <李> <>强检查点      

? ? SiddhiStreamOperator中保留了两种状态信息,一种是priorityQueue中保存的由于水印未发送给神功的消息。另一种是神功本身的状态,通过SiddhiAppRuntime.snapshot()获得

FLINK神功插件学习笔记