当Mapper没有数据输入,mapper.run中的而循环会调用context.nextKeyValue就返回假,于是便返回到runNewMapper中,在这里程序会关闭输入通道和输出通道,这里关闭输出通道并没有关闭收集器,必须要先冲洗一下。
获取更多大数据视频资料请加QQ群:947967114代码结构:
Maptask.runNewMapper→NewOutputCollector.close→MapOutputBuffer.flush
我们看平帮我们做了什么事情,为什么要冲洗。
<代码>公共空冲()抛出IOException ClassNotFoundException, InterruptedException { LOG.info(“地图输出开始冲洗”); spillLock.lock (); 尝试{ 而(spillInProgress) { reporter.progress (); spillDone.await(); 代码>
//这里查看spillInProgress状态,如果有泄漏就等待完成,并且报告状态。
<代码>} checkSpillException (); 最后一个int kvbend=4 * kvend; 代码>
//kvend是元数据块的终点,元数据是向下伸展的。
//kvend是以整数计的数组下标,kvbend是以字节计的数组下标
<代码>如果((kvbend + METASIZE) % kvbuffer。长度!=(赤道)-赤道% METASIZE){代码>
//这个条件说明缓冲区中原来有数据,现在泄漏已经完成,需要释放空间。获取更多大数据视频资料请加QQ群:947967114
<代码>//泄漏完成代码>
//泄漏一次需要调整一些参数,以释放空间,这个工作通过resetSpill完成
<代码> resetSpill(); 代码>
私人空resetSpill () {
<代码>最后int e=赤道; bufstart=bufend=e; 最后一个int对齐=e - (e % METASIZE);//开始/结束点设置为第一元记录//把减速器=" +地区+ +我+”(“泄漏=+ indexRecord。startOffset +”、“+ indexRecord。rawLength +”、“+ indexRecord。partLength + ") "); } } int合并因子=job.getInt (JobContext。IO_SORT_FACTOR, 100)、代码>
//做合并操作时同时操作的流数上限
<代码>布尔sortSegments=segmentList.size()比;合并因子;//对段进行排序 @SuppressWarnings(“unchecked”) RawKeyValueIterator kvIter=合并。合并(rfs工作, keyClass valClass,编解码器, segmentList合并因子, 新路径(mapId.toString ()), sortSegments job.getOutputKeyComparator(),记者, 零,spilledRecordsCounter sortPhase.phase (), TaskType.MAP); 代码>
//合并同分区在一所有泄漏文件中的内容,可能还需要,合并后的结构是一个序列。
<代码>//写合并输出到磁盘 长segmentStart=finalOut.getPos (); FSDataOutputStream finalPartitionOut=CryptoUtils。wrapIfNecessary(工作,finalOut); Writer作家=新的Writer (工作,finalPartitionOut, keyClass valClass,编解码器, spilledRecordsCounter); 如果(combinerRunner==null | | numSpills & lt;minSpillsForCombine) {//minSpillsForCombine在MapOutputBuffer构造函数内被初始化,numSpills为mapTask已经溢写到磁盘泄漏文件数量 合并。writeFile (kvIter、作家、记者、工作);代码>
//将合并后的结果直接写入文件。下面看一下writeFile的源代码;
公共静态& lt; K扩展对象,V延伸Object>
空白writeFile (RawKeyValueIterator记录,Writer
<代码> Progressable Progressable、配置设计)代码>抛出IOException {
<代码>长progressBar=conf.getLong (JobContext.RECORDS_BEFORE_PROGRESS, 10000); 长recordCtr=0; 而(records.next ()) { records.getValue writer.append (records.getKey()()); 代码>
//追加的方式输出到作家中
<代码>如果(((recordCtr + +) % progressBar)==0) { progressable.progress (); }代码>
}
回到主代码:
其他<代码>}{代码>
//有合路器
<代码> combineCollector.setWriter(作家);代码>
//就插入组合器环节
<代码> combinerRunner.combine (kvIter combineCollector); 代码>
//将合并的结果经过组合器后写入文件
<代码>}//关闭 writer.close();//关闭作家通道 sortPhase.startNextPhase ();//记录偏移量 rec.startOffset=segmentStart;大数据:地图终结和泄漏文件合并