十二,MapReduce mapjoin和reducejoin

  

一、地图加入

  

1,适用场景:
一张表很大,一张表很小

  

2,解决方案:
在地图端缓存多张表,提前处理业务逻辑,这样增加地图端业务,减少减少端的数据压力,尽可能减少数据倾斜。

  

3,具体方法:采用分布式缓存
(1)在映射器的设置阶段,将文件读取到缓存集合中
(2)在驱动程序中加载缓存,工作。addCacheFile(新URI(“文件:/e:/mapjoincache/pd.txt"));//缓存普通文件到任务运行节点。

  

4,实例

  
 <代码>//order.txt
  订单id商品id商品数量
  1001 01
  1002 02年2
  1003 03年3
  1004年01 4
  1005 02年5
  1006 03年6//pd.txt
  商品id商品名
  01小米
  02华为
  03格力 
  

要将订单中的商品id替换为商品名称,缓存pd。txt这个小表

  

映射器:   

 <代码类="语言java ">包MapJoin;
  
  进口org.apache.commons.lang.StringUtils;
  进口org.apache.hadoop.io.LongWritable;
  进口org.apache.hadoop.io.NullWritable;
  进口org.apache.hadoop.io.Text;
  进口org.apache.hadoop.mapreduce.Mapper;
  
  进口. io . *;
  进口java.util.HashMap;
  进口java.util.Map;
  
  公开课MapJoinMapper延伸Mapper{
  String> Map<字符串;productMap=new HashMap<字符串,String> ();
  文本k=新的文本();/* *
  *
  *将pd.txt加载到hashmap中,只加载一次
  * @param上下文
  * @throws IOException
  * @throws InterruptedException
  */@Override
  保护空白设置(上下文语境)抛出IOException InterruptedException {
  BufferedReader productReader=new BufferedReader(新InputStreamReader(新FileInputStream(新文件(“G: \ \测试\ \ \ \ mapjoin \ \ pd.txt "))));
  
  字符串行;
  而(stringutil的。isNotEmpty (=productReader.readLine行())){
  String[]字段=line.split (“\ t”);
  productMap。[0]put(字段,字段[1]);
  }
  
  productReader.close ();
  
  }
  
  @Override
  保护空白地图(LongWritable键,文本值,上下文语境)抛出IOException, InterruptedException {
  字符串行=value.toString ();
  String[]字段=line.split (“\ t”);
  
  字符串productName=productMap.get(领域[1]);
  
  k。集(字段[0]+“t \”+ productName +“\ t”+字段[2]);
  
  上下文。写(k, NullWritable.get ());
  
  }
  
  @Override
  保护无效清理(上下文语境)抛出IOException InterruptedException {
  super.cleanup(上下文);
  }
  } 
  司机:

  
 <代码类="语言java ">包MapJoin;
  
  进口org.apache.hadoop.conf.Configuration;
  进口org.apache.hadoop.fs.Path;
  进口org.apache.hadoop.io.NullWritable;
  进口org.apache.hadoop.io.Text;
  进口org.apache.hadoop.mapreduce.Job;
  进口org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  进口org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
  进口java.io.IOException;
  进口java.net.URI;
  进口java.net.URISyntaxException;
  
  公开课MapJoinDriver {
  公共静态void main (String [] args)抛出IOException, URISyntaxException ClassNotFoundException, InterruptedException {
  args=new String [] {“G: \ \测试\ \ \ \ mapjoin \ \秩序。txt”、“G: \ \测试\ \ \ \ mapjoin \ \ join2 \ \ "};
  
  配置配置=new配置();
  
  工作工作=Job.getInstance(设计);
  
  job.setJarByClass (MapJoinDriver.class);
  job.setMapperClass (MapJoinMapper.class);
  
  job.setOutputKeyClass (Text.class);
  job.setOutputValueClass (NullWritable.class);//将重复使用的小文件加载到缓存中
  的工作。addCacheFile(新URI(“文件:///G:/测试//mapjoin pd.txt "));
  
  job.setNumReduceTasks (0);
  
  FileInputFormat。setInputPaths(工作、新路径(args [0]));
  FileOutputFormat。setOutputPath(工作、新路径(args [1]));
  
  job.waitForCompletion(真正的);
  
  }
  }
   
  

二,减少加入

  

1,分析思路
通过将关联条件作为地图的输出的关键,也就是使用商品ID来作为关键,将两表满足加入条件的数据并携带数据所来源的文件信息,发往同一个减少任务,在减少中进行数据的串联

  

输入的数据和上面的地图加入一样,输出的结果也和上面的类似
十二,MapReduce——mapjoin和reducejoin

  

豆:   

十二,MapReduce mapjoin和reducejoin