MapReduce在Hbase

  


org.apache.hadoop.hbase。mapreduce


TableMapper TableReducer


一个地区对应一个map

import  java.io.IOException;      import  org.apache.hadoop.conf.Configuration;   import  org.apache.hadoop.hbase.HBaseConfiguration;   import  org.apache.hadoop.hbase.client.Mutation;   import  org.apache.hadoop.hbase.client.Put;   import  org.apache.hadoop.hbase.client.Result;   import  org.apache.hadoop.hbase.client.Scan;   import  org.apache.hadoop.hbase.io.ImmutableBytesWritable;   import  org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;   import  org.apache.hadoop.hbase.mapreduce.TableMapper;   import  org.apache.hadoop.hbase.mapreduce.TableReducer;   import  org.apache.hadoop.hbase.util.Bytes;   import  org.apache.hadoop.io.Text;   import  org.apache.hadoop.mapreduce.Job;   import  org.apache.hadoop.mapreduce.Reducer;      public  class  HbaseMR  {      ,,,public  class  MyMapper  extends  TableMapper<文本,Text>, {      ,,,,,,@Override   ,,,,,,,protected  void 地图(ImmutableBytesWritable 关键,Result 价值,   ,,,,,,,,,,,,,,,Context 上下文),throws  IOException, InterruptedException  {   ,,,,,,,,,,,//,关键代表rowkey   ,,,,,,,,,,,Text  k =, new 文本(Bytes.toString (key.get ()));   ,,,,,,,,,,,Text  v =, new 文本(Bytes.toString (value.getValue (   ,,,,,,,,,,,,,,,,,,,”basicinfo .getBytes(),“年龄”.getBytes ())));      ,,,,,,,,,,,context.write (v, k);      ,,,,,,,}      ,,,}      ,,,public  class  MyReducer  extends  TableReducer<文本,文本,Text>, {      ,,,,,,@Override   ,,,,,,,protected  void 减少(Text 关键,Iterable,价值观,Context 上下文)   ,,,,,,,,,,,,,,,throws  IOException, InterruptedException  {   ,,,,,,,,,,,Put  Put =, new 把(Bytes.toBytes (key.toString ()));   ,,,,,,,,,,,for  (Text  value :值),{   ,,,,,,,,,,,,,,,put.add (Bytes.toBytes (f1), Bytes.toBytes (value.toString ()),   ,,,,,,,,,,,,,,,,,,,,,,,Bytes.toBytes (value.toString ()));   ,,,,,,,,,,,}   ,,,,,,,,,,,context.write(零,,);   ,,,,,,,}      ,,,}      ,,,public  static  void  main (String [], args), {   ,,,,,,,Configuration 相依=,,,,HBaseConfiguration.create ();   ,,,,,,,try  {   ,,,,,,,,,,,Job 工作=new 工作(参看,“mapreduce 提醒hbase”);   ,,,,,,,,,,,job.setJarByClass (HbaseMR.class);   ,,,,,,,,,,,Scan 扫描=new 扫描();   ,,,,,,,,,,,scan.setCaching (1000);//,,,,,,,,,,,TableMapReduceUtil.initTableMapperJob(“学生”,扫描,,MyMapper.class,, Text.class,, Text.class,,工作);   ,,,,,,,,,,,TableMapReduceUtil.initTableReducerJob (MyReducer.class,“学生时代”,,,,工作);   ,,,,,,,,,,,job.waitForCompletion(真正的);   ,,,,,,,},catch  (Exception  e), {   ,,,,,,,,,,,   ,,,,,,,,,,,e.printStackTrace ();   ,,,,,,,}   null   null   null

MapReduce在Hbase