介绍
这篇文章主要介绍了hadoop中如何实现DBInputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获、下面让小编带着大家一起了解一下。
代码未做测试,先做记录
package com.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/* * ,*要运行本示例 ,* 1,把mysql的jdbc驱动放到taskTracker的自由目录下,重启集群 ,* ,*/public class  WordCountDB extends Configured  implements Tool { , ,private String OUT_PATH =,“hdfs://grid131:9000/output"; , ,public static  class Map  extends Mapper, { public 才能;void 地图(LongWritable 关键,MyUser 价值,Context 上下文),throws IOException, InterruptedException { ,,context.write(关键,new 文本(value.toString ())); ,,} ,} , ,public int 运行(String [], args), throws Exception { Configuration 才能;conf =, this.getConf (); DBConfiguration.configureDB才能(参看,“com.mysql.jdbc.Driver",,“jdbc: mysql://grid131:3306/test",,“root",,“admin"); ,,//输才能出路径如果存在,则删除 FileSystem 才能;fs =, FileSystem.get (new URI (OUT_PATH),配置); fs.delete才能(new 路径(OUT_PATH),真的); ,, Job 才能;Job =, new 工作(参看,WordCountDB.class.getSimpleName ()); job.setJarByClass才能(WordCountDB.class); ,, FileOutputFormat.setOutputPath才能(工作,,new 路径(args [1])); ,,//指才能定不需要减少,直接把地图输出写入到hdfs中 job.setNumReduceTasks才能(0); job.setInputFormatClass才能(DBInputFormat.class); ,,//指才能定表,字段//DBInputFormat.setInput才能(inputClass,工作,,,,条件下,,orderBy,,字段名) DBInputFormat.setInput才能(MyUser.class,工作,,“myuser",,空,,空,,“id",,“name"); job.setMapperClass才能(Map.class); ,,//当才能减少输出类型与地图输出类型一致时,地图的输出类型可以不设置 job.setMapOutputKeyClass才能(LongWritable.class); job.setMapOutputValueClass才能(Text.class); ,, job.waitForCompletion才能(真正的); ,, return 才能;job.isSuccessful () ? 0:1; ,} , ,public static  void main (String [], args), throws Exception { int 才能;exit =, ToolRunner.run (new WordCount (),, args); ,,system . exit(退出); ,} } class MyUser  implements 可写,DBWritable { ,private Long id; ,private String 名称; , ,public Long  getId (), { return 才能;id; ,} ,public void  setId (Long id), { 时间=this.id 才能;id; ,} ,public String  getName (), { return 才能,名字; ,} ,public void  setName (String 名称),{ this.name 才能=,名称; ,} , ,@Override ,public void 写(DataOutput ), throws IOException { out.writeLong才能(this.id); Text.writeString才能(,,this.name); ,} , ,@Override ,public void  readFields (DataInput 在),throws IOException { 时间=this.id 才能;in.readLong (); 时间=this.name 才能;Text.readString(的); ,} , ,@Override ,public void 写(PreparedStatement 语句),throws SQLException { statement.setLong才能(1,this.id); statement.setString才能(2,this.name); ,} , ,@Override ,public void  readFields (ResultSet 结果集),throws SQLException { 时间=this.id 才能;resultSet.getLong (1); 时间=this.name 才能;resultSet.getString (2); ,} , null null null null null hadoop中如何实现DBInputFormat