可以通过一个简单的例子来说明MapReduce到底是什么:
我们要统计一个大文件中的各个单词出现的次数。由于文件太大。我们把这个文件切分成如果小文件,然后安排多个人去统计。这个过程就是“地图”,然后把每个人统计的数字合并起来,这个就是“减少”。
上面的例子如果在MapReduce去做呢,就需要创建一个任务工作,由工作把文件切分成若干独立的数据块,并分布在不同的机器节点中。然后通过分散在不同节点中地图的任务以完全并行的方式进行处理.MapReduce会对地图的输出地行收集,再将结果输出送给减少进行下一步的处理。
对于一个任务的具体执行过程,会有一个名为“JobTracker”的进程负责协调MapReduce执行过程中的所有任务。若干条TaskTracker进程用来运行单独的地图任务,并随时将任务的执行情况汇报给JobTracker。如果一个TaskTracker汇报任务失败或者长时间未对本身任务进行汇报,JobTracker会启动另外一个TaskTracker重新执行单独的地图任务。
下面的具体的代码实现:
1。编写wordcount的相关工作
(1) eclipse下创建相关maven项目,依赖jar包如下(也可参照hadoop源码包下的hadoop-mapreduce-examples项目的pom配置)
注意:要配置一个maven插件maven-jar-plugin,并指定mainClass
& lt; dependencies> & lt; dependency> & lt; groupId> junit & lt; artifactId> junit & lt; version> 4.11 & lt;/version> & lt;/dependency> & lt; dependency> & lt; groupId> org.apache.hadoop & lt; artifactId> hadoop-mapreduce-client-core & lt; version> 2.5.2 & lt;/dependency> & lt; dependency> & lt; groupId> org.apache.hadoop & lt; artifactId> hadoop-common & lt; version> 2.5.2 & lt;/dependency> & lt;/dependencies> & lt; build> & lt; plugins> & lt; plugin> & lt; groupId> org.apache.maven.plugins & lt; artifactId> maven-jar-plugin & lt; configuration> & lt; archive> & lt; manifest> & lt; mainClass> com.xxx.demo.hadoop.wordcount.WordCount & lt;/manifest> & lt;/archive> & lt;/configuration> & lt;/plugin> & lt;/plugins> & lt;/build>
(2)根据MapReduce的运行机制,一个工作至少要编写三个类分别用来完成地图逻辑,减少逻辑,作业调度这三件事。
地图的代码可继承org.apache.hadoop.mapreduce.Mapper类
公共静态类TokenizerMapper 扩展Mapper<对象、文本、文本IntWritable> { 私人最终静态IntWritable> 公开课IntSumReducer 扩展Reducer<文本,IntWritable,文本,IntWritable>{ 私人IntWritable结果=new IntWritable (); 公共空间减少(文本关键,Iterable值, 上下文语境 )抛出IOException InterruptedException { int和=0; (IntWritable val:值){ 和+=val.get (); } result.set(总和); 上下文。写(键,结果); } }
编写主方法进行作业调度
公共静态void main (String [] args){抛出异常 配置配置=new配置(); 工作工作=工作。getInstance(参看“字数”); job.setJarByClass (WordCount.class); job.setMapperClass (TokenizerMapper.class); job.setCombinerClass (IntSumReducer.class); job.setReducerClass (IntSumReducer.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (IntWritable.class); FileInputFormat。addInputPath(工作、新路径(args [0])); FileOutputFormat。setOutputPath(工作、新路径(args [1])); job.waitForCompletion(真正的);//system . exit (job.waitForCompletion(真正的)& # 63;0: 1); }
2。上传数据文件到hadoop集群环境
执行mvn install把项目打成jar文件然后上传到linux集群环境,使用hdfs dfs mkdir命令在hdfs文件系统中创建相应的命令,使用hdfs dfs——把需要处理的数据文件上传到hdfs系统中,示例:hdfs dfs——$ {linux_path/数据文件}$ {hdfs_path}
3。执行工作
在集群环境中执行命令:hadoop jar $ {linux_path}/wordcount。jar $ {hdfs_input_path} $ {hdfs_output_path}
4。查看统计结果
hdfs dfs猫$ {hdfs_output_path}/输出文件名
以上的方式在未启动hadoop集群环境时,是当地以模式运行,此时HDFS和纱都不起作用下。面是在伪分布式模式下执行mapreduce工作时需要做的工作,先把官网上列的步骤摘录出来: