hadoop中如何实现DBInputFormat

  介绍

这篇文章主要介绍了hadoop中如何实现DBInputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获、下面让小编带着大家一起了解一下。

代码未做测试,先做记录

 package  com.test;
  import  java.io.DataInput;
  import  java.io.DataOutput;
  import  java.io.IOException;
  import  java.net.URI;
  import  java.sql.PreparedStatement;
  import  java.sql.ResultSet;
  import  java.sql.SQLException;
  import  org.apache.hadoop.conf.Configuration;
  import  org.apache.hadoop.conf.Configured;
  import  org.apache.hadoop.fs.FileSystem;
  import  org.apache.hadoop.fs.Path;
  import  org.apache.hadoop.io.LongWritable;
  import  org.apache.hadoop.io.Text;
  import  org.apache.hadoop.io.Writable;
  import  org.apache.hadoop.mapreduce.Job;
  import  org.apache.hadoop.mapreduce.Mapper;
  import  org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
  import  org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
  import  org.apache.hadoop.mapreduce.lib.db.DBWritable;
  import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import  org.apache.hadoop.util.Tool;
  import  org.apache.hadoop.util.ToolRunner;/* *
  ,*要运行本示例
  ,* 1,把mysql的jdbc驱动放到taskTracker的自由目录下,重启集群
  ,*
  ,*/public  class  WordCountDB  extends  Configured  implements  Tool  {
  ,
  ,private  String  OUT_PATH =,“hdfs://grid131:9000/output";
  ,
  ,public  static  class  Map  extends  Mapper, {
  public 才能;void 地图(LongWritable 关键,MyUser 价值,Context 上下文),throws  IOException, InterruptedException  {
  ,,context.write(关键,new 文本(value.toString ()));
  ,,}
  ,}
  ,
  ,public  int 运行(String [], args), throws  Exception  {
  Configuration 才能;conf =, this.getConf ();
  DBConfiguration.configureDB才能(参看,“com.mysql.jdbc.Driver",,“jdbc: mysql://grid131:3306/test",,“root",,“admin");
  ,,//输才能出路径如果存在,则删除
  FileSystem 才能;fs =, FileSystem.get (new  URI (OUT_PATH),配置);
  fs.delete才能(new 路径(OUT_PATH),真的);
  ,,
  Job 才能;Job =, new 工作(参看,WordCountDB.class.getSimpleName ());
  job.setJarByClass才能(WordCountDB.class);
  ,,
  FileOutputFormat.setOutputPath才能(工作,,new 路径(args [1]));
  ,,//指才能定不需要减少,直接把地图输出写入到hdfs中
  job.setNumReduceTasks才能(0);
  job.setInputFormatClass才能(DBInputFormat.class);
  ,,//指才能定表,字段//DBInputFormat.setInput才能(inputClass,工作,,,,条件下,,orderBy,,字段名)
  DBInputFormat.setInput才能(MyUser.class,工作,,“myuser",,空,,空,,“id",,“name");
  job.setMapperClass才能(Map.class);
  ,,//当才能减少输出类型与地图输出类型一致时,地图的输出类型可以不设置
  job.setMapOutputKeyClass才能(LongWritable.class);
  job.setMapOutputValueClass才能(Text.class);
  ,,
  job.waitForCompletion才能(真正的);
  ,,
  return 才能;job.isSuccessful () ? 0:1;
  ,}
  ,
  ,public  static  void  main (String [], args), throws  Exception  {
  int 才能;exit =, ToolRunner.run (new  WordCount (),, args);
  ,,system . exit(退出);
  ,}
  }
  class  MyUser  implements 可写,DBWritable  {
  ,private  Long  id;
  ,private  String 名称;
  ,
  ,public  Long  getId (), {
  return 才能;id;
  ,}
  ,public  void  setId (Long  id), {
  时间=this.id 才能;id;
  ,}
  ,public  String  getName (), {
  return 才能,名字;
  ,}
  ,public  void  setName (String 名称),{
  this.name 才能=,名称;
  ,}
  ,
  ,@Override
  ,public  void 写(DataOutput ), throws  IOException  {
  out.writeLong才能(this.id);
  Text.writeString才能(,,this.name);
  ,}
  ,
  ,@Override
  ,public  void  readFields (DataInput 在),throws  IOException  {
  时间=this.id 才能;in.readLong ();
  时间=this.name 才能;Text.readString(的);
  ,}
  ,
  ,@Override
  ,public  void 写(PreparedStatement 语句),throws  SQLException  {
  statement.setLong才能(1,this.id);
  statement.setString才能(2,this.name);
  ,}
  ,
  ,@Override
  ,public  void  readFields (ResultSet 结果集),throws  SQLException  {
  时间=this.id 才能;resultSet.getLong (1);
  时间=this.name 才能;resultSet.getString (2);
  ,}
  ,
  null
  null
  null
  null
  null

hadoop中如何实现DBInputFormat