【水槽】HDFSSink源码理解

  

HDFSSink组件中,主要由HDFSEventSink,BucketWriter,HDFSWriter几个类构成。

其中HDFSEventSink主要功能呢是判定Sink的配置条件是否合法,并负责从Channel中获取events,通过解析event的header信息决定event对应的BucketWriter。

BucketWriter负责按照rollCount,rollSize等条件在HDFS端生成(roll)文件,通过配置文件配置的文件数据格式以及序列化的方式,在每个BucetWriter同一处理。

HDFSWriter作为接口,其具体实现有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream这三种




HDFSSink功能中关键类类图

走通HDFSEventSink之前,肯定要对其中配置参数有了解

1、configure()方法中,从配置文件中获取filePath,fileName等信息,具体参数含义可以参考

2、start()方法,初始化固定大小线程池callTimeoutPool, 周期执行线程池timedRollerPool,以及sfWriters,并启动sinkCounter

  1. callTimeoutPool

  2. timedRollerPool,周期执行线程池中主要有HDFS文件重命名的线程(根据retryInterval),达到生成文件要求进行roll操作的线程(根据idleTimeout),关闭闲置文件的线程等(rollInterval)

  3. sfWriters  sfWriters实际是一个LinkedHashMap的实现类,通过重写removeEldestEntry方法,将最久未使用的writer移除,保证sfWriters中能够维护一个固定大小(maxOpenFiles)的最大打开文件数

  4. sinkCounter sink组件监控指标的计数器


3、process() 方法是HDFSEventSink中最主要的逻辑(部分关键节点通过注释写到代码中),

  1. process()方法中获取到Channel,

  2. 并按照batchSize大小循环从Channel中获取event,通过解析event得到event的header等信息,确定该event的HDFS目的路径以及目的文件名

  3. 每个event可能对应不同的bucketWriter和hdfswriter,将每个event添加到相应的writer中

  4. 当event个数达到batchSize个数后,writer进行flush,同时提交事务

其中bucketWriter负责生成(roll)文件的方式,处理文件格式以及序列化等逻辑

其中hdfsWriter具体实现有"SequenceFile","DataStream","CompressedStream";三种,用户根据hdfsWriter的实现

public Status process() throws EventDeliveryException {
  Channel  Channel =, getChannel();,//调用父类getChannel方法,建立通道与水槽之间的连接
  Transaction  Transaction =, channel.getTransaction();//每次批提交都建立在一个事务上
  transaction.begin ();
  try  {
  Set【水槽】HDFSSink源码理解