大数据:地图终结和泄漏文件合并

  

当Mapper没有数据输入,mapper.run中的而循环会调用context.nextKeyValue就返回假,于是便返回到runNewMapper中,在这里程序会关闭输入通道和输出通道,这里关闭输出通道并没有关闭收集器,必须要先冲洗一下。

  

获取更多大数据视频资料请加QQ群:947967114代码结构:

  

Maptask.runNewMapper→NewOutputCollector.close→MapOutputBuffer.flush

  

我们看平帮我们做了什么事情,为什么要冲洗。

  
 <代码>公共空冲()抛出IOException ClassNotFoundException,
  
  InterruptedException {
  
  LOG.info(“地图输出开始冲洗”);
  
  spillLock.lock ();
  
  尝试{
  
  而(spillInProgress) {
  
  reporter.progress ();
  
  spillDone.await();  
  

//这里查看spillInProgress状态,如果有泄漏就等待完成,并且报告状态。

  
 <代码>}
  
  checkSpillException ();
  
  最后一个int kvbend=4 * kvend;  
  

//kvend是元数据块的终点,元数据是向下伸展的。

  

//kvend是以整数计的数组下标,kvbend是以字节计的数组下标

  
 <代码>如果((kvbend + METASIZE) % kvbuffer。长度!=(赤道)-赤道% METASIZE){ 
  

//这个条件说明缓冲区中原来有数据,现在泄漏已经完成,需要释放空间。获取更多大数据视频资料请加QQ群:947967114

  
 <代码>//泄漏完成 
  

//泄漏一次需要调整一些参数,以释放空间,这个工作通过resetSpill完成

  
 <代码> resetSpill();  
  

私人空resetSpill () {

  
 <代码>最后int e=赤道;
  
  bufstart=bufend=e;
  
  最后一个int对齐=e - (e % METASIZE);//开始/结束点设置为第一元记录//把减速器=" +地区+
  
  +我+”(“泄漏=+ indexRecord。startOffset +”、“+
  
  indexRecord。rawLength +”、“+ indexRecord。partLength + ") ");
  
  }
  
  }
  
  int合并因子=job.getInt (JobContext。IO_SORT_FACTOR, 100)、 
  

//做合并操作时同时操作的流数上限

  
 <代码>布尔sortSegments=segmentList.size()比;合并因子;//对段进行排序
  
  @SuppressWarnings(“unchecked”)
  
  RawKeyValueIterator kvIter=合并。合并(rfs工作,
  
  keyClass valClass,编解码器,
  
  segmentList合并因子,
  
  新路径(mapId.toString ()),
  
  sortSegments job.getOutputKeyComparator(),记者,
  
  零,spilledRecordsCounter sortPhase.phase (),
  
  TaskType.MAP);  
  

//合并同分区在一所有泄漏文件中的内容,可能还需要,合并后的结构是一个序列。

  
 <代码>//写合并输出到磁盘
  
  长segmentStart=finalOut.getPos ();
  
  FSDataOutputStream finalPartitionOut=CryptoUtils。wrapIfNecessary(工作,finalOut);
  
  Writer作家=新的Writer(工作,finalPartitionOut, keyClass valClass,编解码器,
  
  spilledRecordsCounter);
  
  如果(combinerRunner==null | | numSpills & lt;minSpillsForCombine) {//minSpillsForCombine在MapOutputBuffer构造函数内被初始化,numSpills为mapTask已经溢写到磁盘泄漏文件数量
  
  合并。writeFile (kvIter、作家、记者、工作); 
  

//将合并后的结果直接写入文件。下面看一下writeFile的源代码;

  

公共静态& lt; K扩展对象,V延伸Object>

  

空白writeFile (RawKeyValueIterator记录,Writer作家,

  
 <代码> Progressable Progressable、配置设计) 
  抛出IOException {

  
 <代码>长progressBar=conf.getLong (JobContext.RECORDS_BEFORE_PROGRESS,
  
  10000);
  
  长recordCtr=0;
  
  而(records.next ()) {
  
  records.getValue writer.append (records.getKey()());  
  

//追加的方式输出到作家中

  
 <代码>如果(((recordCtr + +) % progressBar)==0) {
  
  progressable.progress ();
  
  } 
  

}   

回到主代码:

  其他
 <代码>}{ 
  

//有合路器

  
 <代码> combineCollector.setWriter(作家); 
  

//就插入组合器环节

  
 <代码> combinerRunner.combine (kvIter combineCollector);  
  

//将合并的结果经过组合器后写入文件

  
 <代码>}//关闭
  
  writer.close();//关闭作家通道
  
  sortPhase.startNextPhase ();//记录偏移量
  
  rec.startOffset=segmentStart; 

大数据:地图终结和泄漏文件合并