介绍
这篇文章主要讲解了“Hadoop的整文件读取方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Hadoop的整文件读取方法”吧!
,,,,写Hadoop程序时,有时候需要读取整个文件,而不是分片读取,但默认的为分片读取,所以,只有编写自己的整文件读取类。
需要编写的有:
,,,, WholeInputFormat类,继承自FileInputFormat类
,,,, WholeRecordReader类,继承自RecordReader类
,,,,其中,用于读取的类是WholeRecordReader类。以下代码以文本为关键值类型,BytesWritable为价值的类型,因为大多数格式在Hadoop中都没有相应的类型支持,比如jpg,自卫队,png等等,在Hadoop中都没有相应的类,但是都可以转换为byte[]字节流,然后在转化为BytesWritable类型,最后在地图或者减少再转换成java中的相应类型。
,,,,代码如下,解释见:
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class  WholeInputFormat extends  FileInputFormat<文本,BytesWritable> { ,,@Override ,,,public RecordReader<文本,BytesWritable> createRecordReader (InputSplit 分裂,,TaskAttemptContext 上下文), ,,,,throws IOException, InterruptedException ,,,,{ ,,,,,,,return new WholeRecordReader (); ,,,,} ,,@Override ,,,//判断是否分片,假表示不分片,真表示分片只 ,,,//其实这个不写也可以,因为在WholeRecordReader中一次性全部读完 ,,,,protected boolean isSplitable (Path  JobContext 语境;文件) ,,,,{ ,,,,,,,,return 假; ,,,,} }
,,,,下面是WholeRecordReader类:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class  WholeRecordReader extends  RecordReader<文本,BytesWritable> { ,,,,//Hadoop中处理文件的类 ,,,,private FileSplit 文件分割器; ,,,,private FSDataInputStream 拷贝=,空; , ,,,,private BytesWritable value =,空; ,,,,private Text key =,空; ,,,, ,,,,//用于判断文件是否读取完成 ,,,,//也就是因为这个,所以WholeInputFormat中的isSplitable方法可以不用写 ,,,,private boolean processed =,假; , ,,,@Override ,,,,public void 关闭(),throws IOException ,,,,{ ,,,,,,,//do 什么都没有 ,,,,} ,,,@Override ,,,,public Text getCurrentKey (), throws IOException,, InterruptedException ,,,,{ ,,,,,,,,,return this.key; ,,,,} , ,,,@Override ,,,,public BytesWritable getCurrentValue (), throws IOException, InterruptedException ,,,,{ ,,,,,,,,,return this.value; ,,,,} , ,,,@Override ,,,,public float getProgress (), throws IOException,, InterruptedException ,,,,{ ,,,,,,,,,return processed ?, fileSplit.getLength (),:, 0; ,,,,} ,, ,,,@Override ,,,,public void 初始化(InputSplit 分裂,TaskAttemptContext 上下文)throws IOException,, InterruptedException ,,,,{ ,,,,,,,,,//打开一个文件输入流 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null nullHadoop的整文件读取方法