1,需求
统计每一个手机号的总流量(上行流量+下行流量),上行流量,下行流量,并且最后按照总流量进行手机号的排序。* * * *
2,数据输入及输出格式
<代码>源数据比较敏感,这里就不展示出来了代码>
输入格式为:
<代码>时间戳,电话号码,基站的物理地址,访问网址的ip,网站域名,数据包,接包数,上行/传流量,下行/载流量,响应码 分隔符为“t \”代码>
输出格式为:
<代码>手机号码上行流量下行流量总流量 并且根据总流量的大小进行排序代码>
3,思路分析
地图阶段:
切分字段,以手机号为关键,值为一个bean对象,价值保存对应手机号的上下行流量,以及总流量;关键保存手机号,也就是类似的结构:
<代码> & lt; 1234567, & lt;上下行流量,总流量在祝辞代码>
减少阶段:
对于同一个关键的(即同一手机号)的上下行流量进行累加,获取总的上下行流量,总流量。
并且最后需要对总流量进行排序,所以减少输出的关键为整个bean,值为空
4,具体程序
FlowBean.java
<代码类="语言java ">/*用于保存流量数据的自定义可序列化类*/包PhoneData; 进口lombok.Getter; 进口lombok.NoArgsConstructor; 进口lombok.Setter; 进口org.apache.hadoop.io.Writable; 进口org.apache.hadoop.io.WritableComparable; 进口java.io.DataInput; 进口java.io.DataOutput; 进口java.io.IOException; @ getter @ setter @NoArgsConstructor 公共类FlowBean实现WritableComparable{/* *该类是一个可序列化类,且可比较,所以要实现WritableComparable接口 *上,传下载,总流量 */私人int上升气流; 私人int向下流; 私人int sumFlow; 公共FlowBean (int上升气流,int向下流){ 超级(); 这一点。上升气流=上升气流; 这一点。向下流=向下流; 这一点。sumFlow=上升气流+向下流; }/* * *序列化方法 * * @param dataOutput * @throws IOException */@Override 公共空写(DataOutput DataOutput)抛出IOException { dataOutput.writeInt (this.upFlow); dataOutput.writeInt (this.downFlow); dataOutput.writeInt (this.sumFlow); }/* * *反序列化 * @param dataInput * @throws IOException */@Override 公共空readFields (DataInput DataInput)抛出IOException { 这一点。上升气流=dataInput.readInt (); 这一点。向下流=dataInput.readInt (); 这一点。sumFlow=dataInput.readInt (); }/* * *打印字符串方法 * @return */@Override 公共字符串toString () { StringBuilder某人=new StringBuilder (); sb.append (this.upFlow); sb.append (" "); sb.append (this.downFlow); sb.append (" "); sb.append (this.sumFlow); 返回sb.toString (); }/* * *对象的比较方法,用于排序比较 * @param o * @return */@Override 公共int compareTo (FlowBean o) { 在返回this.getSumFlow ();o.getSumFlow () ?1:1; } } 代码>
映射器:
<代码类="语言java ">包PhoneData; 进口org.apache.hadoop.io.LongWritable; 进口org.apache.hadoop.io.Text; 进口org.apache.hadoop.mapreduce.Mapper; 进口java.io.IOException; 公开课PhoneMapper延伸Mapper{ 文本k=新的文本(); FlowBean v=new FlowBean (); @Override 保护空白地图(LongWritable键,文本值,上下文语境)抛出IOException, InterruptedException { 字符串行=value.toString (); String[]字段=line.split (“\ t”);//开始解析切割数据 k.set(领域[1]); int向下流=Integer.parseInt(字段(字段。长度- 2]); int上升气流=Integer.parseInt(字段(字段。长度- 3]); v.setDownFlow(向下流); v.setUpFlow(上升气流); v。setSumFlow(上升气流+向下流); 上下文。写(k、v); } } 代码>
减速机:
<代码类="语言java ">包PhoneData; 进口org.apache.hadoop.io.Text; 进口org.apache.hadoop.mapreduce.Reducer; 进口java.io.IOException; 公共类PhoneReducer Reducer<延伸;文本、FlowBean FlowBean, Text>{ FlowBean v=new FlowBean (); @Override 保护孔隙减少(文本关键,Iterable五,MapReduce普通排序例子,统计手机号流量