HDFSSink组件中,主要由HDFSEventSink,BucketWriter,HDFSWriter几个类构成。
其中HDFSEventSink主要功能呢是判定Sink的配置条件是否合法,并负责从Channel中获取events,通过解析event的header信息决定event对应的BucketWriter。
BucketWriter负责按照rollCount,rollSize等条件在HDFS端生成(roll)文件,通过配置文件配置的文件数据格式以及序列化的方式,在每个BucetWriter同一处理。
HDFSWriter作为接口,其具体实现有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream这三种
HDFSSink功能中关键类类图
走通HDFSEventSink之前,肯定要对其中配置参数有了解
1、configure()方法中,从配置文件中获取filePath,fileName等信息,具体参数含义可以参考
2、start()方法,初始化固定大小线程池callTimeoutPool, 周期执行线程池timedRollerPool,以及sfWriters,并启动sinkCounter
callTimeoutPool
timedRollerPool,周期执行线程池中主要有HDFS文件重命名的线程(根据retryInterval),达到生成文件要求进行roll操作的线程(根据idleTimeout),关闭闲置文件的线程等(rollInterval)
sfWriters sfWriters实际是一个LinkedHashMap的实现类,通过重写removeEldestEntry方法,将最久未使用的writer移除,保证sfWriters中能够维护一个固定大小(maxOpenFiles)的最大打开文件数
sinkCounter sink组件监控指标的计数器
3、process() 方法是HDFSEventSink中最主要的逻辑(部分关键节点通过注释写到代码中),
process()方法中获取到Channel,
并按照batchSize大小循环从Channel中获取event,通过解析event得到event的header等信息,确定该event的HDFS目的路径以及目的文件名
每个event可能对应不同的bucketWriter和hdfswriter,将每个event添加到相应的writer中
当event个数达到batchSize个数后,writer进行flush,同时提交事务
其中bucketWriter负责生成(roll)文件的方式,处理文件格式以及序列化等逻辑
其中hdfsWriter具体实现有"SequenceFile","DataStream","CompressedStream";三种,用户根据hdfsWriter的实现
public Status process() throws EventDeliveryException { Channel Channel =, getChannel();,//调用父类getChannel方法,建立通道与水槽之间的连接 Transaction Transaction =, channel.getTransaction();//每次批提交都建立在一个事务上 transaction.begin (); try { Set【水槽】HDFSSink源码理解