一、地图加入
1,适用场景:
一张表很大,一张表很小
2,解决方案:
在地图端缓存多张表,提前处理业务逻辑,这样增加地图端业务,减少减少端的数据压力,尽可能减少数据倾斜。
3,具体方法:采用分布式缓存
(1)在映射器的设置阶段,将文件读取到缓存集合中
(2)在驱动程序中加载缓存,工作。addCacheFile(新URI(“文件:/e:/mapjoincache/pd.txt"));//缓存普通文件到任务运行节点。
4,实例
<代码>//order.txt 订单id商品id商品数量 1001 01 1002 02年2 1003 03年3 1004年01 4 1005 02年5 1006 03年6//pd.txt 商品id商品名 01小米 02华为 03格力代码>
要将订单中的商品id替换为商品名称,缓存pd。txt这个小表
映射器:
<代码类="语言java ">包MapJoin; 进口org.apache.commons.lang.StringUtils; 进口org.apache.hadoop.io.LongWritable; 进口org.apache.hadoop.io.NullWritable; 进口org.apache.hadoop.io.Text; 进口org.apache.hadoop.mapreduce.Mapper; 进口. io . *; 进口java.util.HashMap; 进口java.util.Map; 公开课MapJoinMapper延伸Mapper司机:{ String> Map<字符串;productMap=new HashMap<字符串,String> (); 文本k=新的文本();/* * * *将pd.txt加载到hashmap中,只加载一次 * @param上下文 * @throws IOException * @throws InterruptedException */@Override 保护空白设置(上下文语境)抛出IOException InterruptedException { BufferedReader productReader=new BufferedReader(新InputStreamReader(新FileInputStream(新文件(“G: \ \测试\ \ \ \ mapjoin \ \ pd.txt ")))); 字符串行; 而(stringutil的。isNotEmpty (=productReader.readLine行())){ String[]字段=line.split (“\ t”); productMap。[0]put(字段,字段[1]); } productReader.close (); } @Override 保护空白地图(LongWritable键,文本值,上下文语境)抛出IOException, InterruptedException { 字符串行=value.toString (); String[]字段=line.split (“\ t”); 字符串productName=productMap.get(领域[1]); k。集(字段[0]+“t \”+ productName +“\ t”+字段[2]); 上下文。写(k, NullWritable.get ()); } @Override 保护无效清理(上下文语境)抛出IOException InterruptedException { super.cleanup(上下文); } }代码>
<代码类="语言java ">包MapJoin; 进口org.apache.hadoop.conf.Configuration; 进口org.apache.hadoop.fs.Path; 进口org.apache.hadoop.io.NullWritable; 进口org.apache.hadoop.io.Text; 进口org.apache.hadoop.mapreduce.Job; 进口org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 进口org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 进口java.io.IOException; 进口java.net.URI; 进口java.net.URISyntaxException; 公开课MapJoinDriver { 公共静态void main (String [] args)抛出IOException, URISyntaxException ClassNotFoundException, InterruptedException { args=new String [] {“G: \ \测试\ \ \ \ mapjoin \ \秩序。txt”、“G: \ \测试\ \ \ \ mapjoin \ \ join2 \ \ "}; 配置配置=new配置(); 工作工作=Job.getInstance(设计); job.setJarByClass (MapJoinDriver.class); job.setMapperClass (MapJoinMapper.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (NullWritable.class);//将重复使用的小文件加载到缓存中 的工作。addCacheFile(新URI(“文件:///G:/测试//mapjoin pd.txt ")); job.setNumReduceTasks (0); FileInputFormat。setInputPaths(工作、新路径(args [0])); FileOutputFormat。setOutputPath(工作、新路径(args [1])); job.waitForCompletion(真正的); } } 代码>
二,减少加入
1,分析思路
通过将关联条件作为地图的输出的关键,也就是使用商品ID来作为关键,将两表满足加入条件的数据并携带数据所来源的文件信息,发往同一个减少任务,在减少中进行数据的串联
输入的数据和上面的地图加入一样,输出的结果也和上面的类似
豆: