五,MapReduce普通排序例子,统计手机号流量

  

1,需求

  

统计每一个手机号的总流量(上行流量+下行流量),上行流量,下行流量,并且最后按照总流量进行手机号的排序。* * * *

  

2,数据输入及输出格式

  
 <代码>源数据比较敏感,这里就不展示出来了 
  

输入格式为:

  
 <代码>时间戳,电话号码,基站的物理地址,访问网址的ip,网站域名,数据包,接包数,上行/传流量,下行/载流量,响应码
  
  分隔符为“t \” 
  

输出格式为:

  
 <代码>手机号码上行流量下行流量总流量
  
  并且根据总流量的大小进行排序 
  

3,思路分析

  

地图阶段:
切分字段,以手机号为关键,值为一个bean对象,价值保存对应手机号的上下行流量,以及总流量;关键保存手机号,也就是类似的结构:

  
 <代码> & lt; 1234567, & lt;上下行流量,总流量在祝辞 
  

减少阶段:
对于同一个关键的(即同一手机号)的上下行流量进行累加,获取总的上下行流量,总流量。
并且最后需要对总流量进行排序,所以减少输出的关键为整个bean,值为空

  

4,具体程序

  

FlowBean.java   

 <代码类="语言java ">/*用于保存流量数据的自定义可序列化类*/包PhoneData;
  
  进口lombok.Getter;
  进口lombok.NoArgsConstructor;
  进口lombok.Setter;
  进口org.apache.hadoop.io.Writable;
  进口org.apache.hadoop.io.WritableComparable;
  
  进口java.io.DataInput;
  进口java.io.DataOutput;
  进口java.io.IOException;
  
  @ getter
  @ setter
  @NoArgsConstructor
  公共类FlowBean实现WritableComparable{/* *该类是一个可序列化类,且可比较,所以要实现WritableComparable接口
  *上,传下载,总流量
  */私人int上升气流;
  私人int向下流;
  私人int sumFlow;
  
  公共FlowBean (int上升气流,int向下流){
  超级();
  这一点。上升气流=上升气流;
  这一点。向下流=向下流;
  这一点。sumFlow=上升气流+向下流;
  }/* *
  *序列化方法
  *
  * @param dataOutput
  * @throws IOException
  */@Override
  公共空写(DataOutput DataOutput)抛出IOException {
  dataOutput.writeInt (this.upFlow);
  dataOutput.writeInt (this.downFlow);
  dataOutput.writeInt (this.sumFlow);
  }/* *
  *反序列化
  * @param dataInput
  * @throws IOException
  */@Override
  公共空readFields (DataInput DataInput)抛出IOException {
  这一点。上升气流=dataInput.readInt ();
  这一点。向下流=dataInput.readInt ();
  这一点。sumFlow=dataInput.readInt ();
  }/* *
  *打印字符串方法
  * @return
  */@Override
  公共字符串toString () {
  StringBuilder某人=new StringBuilder ();
  sb.append (this.upFlow);
  sb.append (" ");
  sb.append (this.downFlow);
  sb.append (" ");
  sb.append (this.sumFlow);
  返回sb.toString ();
  }/* *
  *对象的比较方法,用于排序比较
  * @param o
  * @return
  */@Override
  公共int compareTo (FlowBean o) {
  在返回this.getSumFlow ();o.getSumFlow () ?1:1;
  }
  }
   
  

映射器:   

 <代码类="语言java ">包PhoneData;
  
  进口org.apache.hadoop.io.LongWritable;
  进口org.apache.hadoop.io.Text;
  进口org.apache.hadoop.mapreduce.Mapper;
  
  进口java.io.IOException;
  
  公开课PhoneMapper延伸Mapper{
  文本k=新的文本();
  FlowBean v=new FlowBean ();
  
  @Override
  保护空白地图(LongWritable键,文本值,上下文语境)抛出IOException, InterruptedException {
  字符串行=value.toString ();
  String[]字段=line.split (“\ t”);//开始解析切割数据
  k.set(领域[1]);
  int向下流=Integer.parseInt(字段(字段。长度- 2]);
  int上升气流=Integer.parseInt(字段(字段。长度- 3]);
  v.setDownFlow(向下流);
  v.setUpFlow(上升气流);
  v。setSumFlow(上升气流+向下流);
  
  上下文。写(k、v);
  }
  }
   
  

减速机:   

 <代码类="语言java ">包PhoneData;
  
  进口org.apache.hadoop.io.Text;
  进口org.apache.hadoop.mapreduce.Reducer;
  
  进口java.io.IOException;
  
  公共类PhoneReducer Reducer<延伸;文本、FlowBean FlowBean, Text>{
  
  FlowBean v=new FlowBean ();
  
  @Override
  保护孔隙减少(文本关键,Iterable

五,MapReduce普通排序例子,统计手机号流量