博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一步一步跟我学习hadoop(7)----hadoop连接mysql数据库运行数据读写数据库操作
阅读量:6859 次
发布时间:2019-06-26

本文共 16438 字,大约阅读时间需要 54 分钟。

    为了方便 MapReduce 直接訪问关系型数据库(Mysql,Oracle)。Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,依据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

    执行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,通常是因为程序找不到mysql驱动包。解决方法是让每一个tasktracker执行MapReduce程序时都能够找到该驱动包。

加入包有两种方式:

(1)在每一个节点下的${HADOOP_HOME}/lib下加入该包。重新启动集群,通常是比較原始的方法。

(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

       b)在mr程序提交job前,加入语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);

mysql数据库存储到hadoop hdfs

mysql表创建和数据初始化

DROP TABLE IF EXISTS `wu_testhadoop`;CREATE TABLE `wu_testhadoop` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `title` varchar(255) DEFAULT NULL,  `content` varchar(255) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;-- ------------------------------ Records of wu_testhadoop-- ----------------------------INSERT INTO `wu_testhadoop` VALUES ('1', '123', '122312');INSERT INTO `wu_testhadoop` VALUES ('2', '123', '123456');

定义hadoop数据訪问

mysql表创建完成后,我们须要定义hadoop訪问mysql的规则。

hadoop提供了org.apache.hadoop.io.Writable接口来实现简单的高效的可序列化的协议,该类基于DataInput和DataOutput来实现相关的功能。

hadoop对数据库訪问也提供了org.apache.hadoop.mapred.lib.db.DBWritable接口,当中write方法用于对PreparedStatement对象设定值,readFields方法用于对从数据库读取出来的对象进行列的值绑定。

以上两个接口的使用例如以下(内容是从源代码得来)

writable

public class MyWritable implements Writable {       // Some data            private int counter;       private long timestamp;              public void write(DataOutput out) throws IOException {         out.writeInt(counter);         out.writeLong(timestamp);       }              public void readFields(DataInput in) throws IOException {         counter = in.readInt();         timestamp = in.readLong();       }              public static MyWritable read(DataInput in) throws IOException {         MyWritable w = new MyWritable();         w.readFields(in);         return w;       }     }

DBWritable

public class MyWritable implements Writable, DBWritable {   // Some data        private int counter;   private long timestamp;          //Writable#write() implementation   public void write(DataOutput out) throws IOException {     out.writeInt(counter);     out.writeLong(timestamp);   }          //Writable#readFields() implementation   public void readFields(DataInput in) throws IOException {     counter = in.readInt();     timestamp = in.readLong();   }          public void write(PreparedStatement statement) throws SQLException {     statement.setInt(1, counter);     statement.setLong(2, timestamp);   }          public void readFields(ResultSet resultSet) throws SQLException {     counter = resultSet.getInt(1);     timestamp = resultSet.getLong(2);   }  }

数据库相应的实现

package com.wyg.hadoop.mysql.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.lib.db.DBWritable;public class DBRecord implements Writable, DBWritable{	private int id;	private String title;	private String content;	public int getId() {		return id;	}	public void setId(int id) {		this.id = id;	}	public String getTitle() {		return title;	}	public void setTitle(String title) {		this.title = title;	}	public String getContent() {		return content;	}	public void setContent(String content) {		this.content = content;	}	@Override	public void readFields(ResultSet set) throws SQLException {		this.id = set.getInt("id");		this.title = set.getString("title");		this.content = set.getString("content");	}	@Override	public void write(PreparedStatement pst) throws SQLException {		pst.setInt(1, id);		pst.setString(2, title);		pst.setString(3, content);	}	@Override	public void readFields(DataInput in) throws IOException {		this.id = in.readInt();		this.title = Text.readString(in);		this.content = Text.readString(in);	}	@Override	public void write(DataOutput out) throws IOException {		out.writeInt(this.id);		Text.writeString(out, this.title);		Text.writeString(out, this.content);	}	@Override	public String toString() {		 return this.id + " " + this.title + " " + this.content;  	}}

实现Map/Reduce

package com.wyg.hadoop.mysql.mapper;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;import com.wyg.hadoop.mysql.bean.DBRecord;@SuppressWarnings("deprecation")public class DBRecordMapper extends MapReduceBase implements Mapper
{ @Override public void map(LongWritable key, DBRecord value, OutputCollector
collector, Reporter reporter) throws IOException { collector.collect(new LongWritable(value.getId()), new Text(value.toString())); } }

測试hadoop连接mysql并将数据存储到hdfs

package com.wyg.hadoop.mysql.db;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.lib.IdentityReducer;import org.apache.hadoop.mapred.lib.db.DBConfiguration;import org.apache.hadoop.mapred.lib.db.DBInputFormat;import com.wyg.hadoop.mysql.bean.DBRecord;import com.wyg.hadoop.mysql.mapper.DBRecordMapper;public class DBAccess {      public static void main(String[] args) throws IOException {             JobConf conf = new JobConf(DBAccess.class);             conf.setOutputKeyClass(LongWritable.class);             conf.setOutputValueClass(Text.class);             conf.setInputFormat(DBInputFormat.class);             Path path = new Path("hdfs://192.168.44.129:9000/user/root/dbout");             FileOutputFormat.setOutputPath(conf, path);             DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://你的ip:3306/数据库名","username","password");             String [] fields = {"id", "title", "content"};             DBInputFormat.setInput(conf, DBRecord.class, "wu_testhadoop",                        null, "id", fields);             conf.setMapperClass(DBRecordMapper.class);             conf.setReducerClass(IdentityReducer.class);             JobClient.runJob(conf);      }}

运行程序,结果例如以下:

15/08/11 16:46:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=15/08/11 16:46:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.15/08/11 16:46:18 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).15/08/11 16:46:19 INFO mapred.JobClient: Running job: job_local_000115/08/11 16:46:19 INFO mapred.MapTask: numReduceTasks: 115/08/11 16:46:19 INFO mapred.MapTask: io.sort.mb = 10015/08/11 16:46:19 INFO mapred.MapTask: data buffer = 79691776/9961472015/08/11 16:46:19 INFO mapred.MapTask: record buffer = 262144/32768015/08/11 16:46:19 INFO mapred.MapTask: Starting flush of map output15/08/11 16:46:19 INFO mapred.MapTask: Finished spill 015/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.Merger: Merging 1 sorted segments15/08/11 16:46:19 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 48 bytes15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now15/08/11 16:46:19 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.44.129:9000/user/root/dbout15/08/11 16:46:19 INFO mapred.LocalJobRunner: reduce > reduce15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.15/08/11 16:46:20 INFO mapred.JobClient:  map 100% reduce 100%15/08/11 16:46:20 INFO mapred.JobClient: Job complete: job_local_000115/08/11 16:46:20 INFO mapred.JobClient: Counters: 1415/08/11 16:46:20 INFO mapred.JobClient:   FileSystemCounters15/08/11 16:46:20 INFO mapred.JobClient:     FILE_BYTES_READ=3460615/08/11 16:46:20 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=6984415/08/11 16:46:20 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=3015/08/11 16:46:20 INFO mapred.JobClient:   Map-Reduce Framework15/08/11 16:46:20 INFO mapred.JobClient:     Reduce input groups=215/08/11 16:46:20 INFO mapred.JobClient:     Combine output records=015/08/11 16:46:20 INFO mapred.JobClient:     Map input records=215/08/11 16:46:20 INFO mapred.JobClient:     Reduce shuffle bytes=015/08/11 16:46:20 INFO mapred.JobClient:     Reduce output records=215/08/11 16:46:20 INFO mapred.JobClient:     Spilled Records=415/08/11 16:46:20 INFO mapred.JobClient:     Map output bytes=4215/08/11 16:46:20 INFO mapred.JobClient:     Map input bytes=215/08/11 16:46:20 INFO mapred.JobClient:     Combine input records=015/08/11 16:46:20 INFO mapred.JobClient:     Map output records=215/08/11 16:46:20 INFO mapred.JobClient:     Reduce input records=2

同一时候能够看到hdfs文件系统多了一个dbout的文件夹,里边的文件保存了数据库相应的数据,内容保存例如以下

1	1 123 1223122	2 123 123456

hdfs数据导入到mysql

    hdfs文件存储到mysql,也须要上边的DBRecord类作为辅助。由于数据库的操作都是通过DBInput和DBOutput来进行的;

    首先须要定义map和reduce的实现(map用以对hdfs的文档进行解析,reduce解析map的输出并输出)

package com.wyg.hadoop.mysql.mapper;import java.io.IOException;import java.io.DataInput;import java.io.DataOutput;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.Iterator;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import com.wyg.hadoop.mysql.bean.DBRecord;public class WriteDB {    // Map处理过程    public static class Map extends MapReduceBase implements            Mapper
{ private final static DBRecord one = new DBRecord(); private Text word = new Text(); @Override public void map(Object key, Text value, OutputCollector
output, Reporter reporter) throws IOException { String line = value.toString(); String[] infos = line.split(" "); String id = infos[0].split(" ")[1]; one.setId(new Integer(id)); one.setTitle(infos[1]); one.setContent(infos[2]); word.set(id); output.collect(word, one); } } public static class Reduce extends MapReduceBase implements Reducer
{ @Override public void reduce(Text key, Iterator
values, OutputCollector
collector, Reporter reporter) throws IOException { DBRecord record = values.next(); collector.collect(record, new Text()); } }}

測试hdfs导入数据到数据库

package com.wyg.hadoop.mysql.db;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.lib.db.DBConfiguration;import org.apache.hadoop.mapred.lib.db.DBInputFormat;import org.apache.hadoop.mapred.lib.db.DBOutputFormat;import com.wyg.hadoop.mysql.bean.DBRecord;import com.wyg.hadoop.mysql.mapper.WriteDB;public class DBInsert {	public static void main(String[] args) throws Exception {		         JobConf conf = new JobConf(WriteDB.class);        // 设置输入输出类型        conf.setInputFormat(TextInputFormat.class);        conf.setOutputFormat(DBOutputFormat.class);        // 不加这两句,通只是,可是网上给的样例没有这两句。        //Text, DBRecord        conf.setMapOutputKeyClass(Text.class);        conf.setMapOutputValueClass(DBRecord.class);        conf.setOutputKeyClass(Text.class);        conf.setOutputValueClass(DBRecord.class);        // 设置Map和Reduce类        conf.setMapperClass(WriteDB.Map.class);        conf.setReducerClass(WriteDB.Reduce.class);        // 设置输如文件夹        FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.44.129:9000/user/root/dbout"));        // 建立数据库连接        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://数据库ip:3306/数据库名称","username","password");        String[] fields = {"id","title","content" };        DBOutputFormat.setOutput(conf, "wu_testhadoop", fields);        JobClient.runJob(conf);    }}

測试结果例如以下

15/08/11 18:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=15/08/11 18:10:15 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.15/08/11 18:10:15 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 115/08/11 18:10:15 INFO mapred.JobClient: Running job: job_local_000115/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 115/08/11 18:10:15 INFO mapred.MapTask: numReduceTasks: 115/08/11 18:10:15 INFO mapred.MapTask: io.sort.mb = 10015/08/11 18:10:15 INFO mapred.MapTask: data buffer = 79691776/9961472015/08/11 18:10:15 INFO mapred.MapTask: record buffer = 262144/32768015/08/11 18:10:15 INFO mapred.MapTask: Starting flush of map output15/08/11 18:10:16 INFO mapred.MapTask: Finished spill 015/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting15/08/11 18:10:16 INFO mapred.LocalJobRunner: hdfs://192.168.44.129:9000/user/root/dbout/part-00000:0+3015/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.15/08/11 18:10:16 INFO mapred.LocalJobRunner: 15/08/11 18:10:16 INFO mapred.Merger: Merging 1 sorted segments15/08/11 18:10:16 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 40 bytes15/08/11 18:10:16 INFO mapred.LocalJobRunner: 15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting15/08/11 18:10:16 INFO mapred.LocalJobRunner: reduce > reduce15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.15/08/11 18:10:16 INFO mapred.JobClient:  map 100% reduce 100%15/08/11 18:10:16 INFO mapred.JobClient: Job complete: job_local_000115/08/11 18:10:16 INFO mapred.JobClient: Counters: 1415/08/11 18:10:16 INFO mapred.JobClient:   FileSystemCounters15/08/11 18:10:16 INFO mapred.JobClient:     FILE_BYTES_READ=3493215/08/11 18:10:16 INFO mapred.JobClient:     HDFS_BYTES_READ=6015/08/11 18:10:16 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=7069415/08/11 18:10:16 INFO mapred.JobClient:   Map-Reduce Framework15/08/11 18:10:16 INFO mapred.JobClient:     Reduce input groups=215/08/11 18:10:16 INFO mapred.JobClient:     Combine output records=015/08/11 18:10:16 INFO mapred.JobClient:     Map input records=215/08/11 18:10:16 INFO mapred.JobClient:     Reduce shuffle bytes=015/08/11 18:10:16 INFO mapred.JobClient:     Reduce output records=215/08/11 18:10:16 INFO mapred.JobClient:     Spilled Records=415/08/11 18:10:16 INFO mapred.JobClient:     Map output bytes=3415/08/11 18:10:16 INFO mapred.JobClient:     Map input bytes=3015/08/11 18:10:16 INFO mapred.JobClient:     Combine input records=015/08/11 18:10:16 INFO mapred.JobClient:     Map output records=215/08/11 18:10:16 INFO mapred.JobClient:     Reduce input records=2

測试之前我对原有表进行了清空处理,能够看到运行后数据库里边加入了两条内容;

下次在运行的时候会报错,属于正常情况,原因在于我们导入数据的时候对id进行赋值了,假设忽略id。是能够一直加入的;

源代码下载地址

源代码已上传,下载地址为

转载地址:http://lkxyl.baihongyu.com/

你可能感兴趣的文章
Android 动画效果 及 自定义动画
查看>>
基于Servlet、JSP、JDBC、MySQL登录模块(包括使用的过滤器和配置)
查看>>
Python将文本生成二维码
查看>>
统计学习那些事
查看>>
XLT架构图(自己 画的)
查看>>
GitHub Top 100 简介
查看>>
C语言中链表任意位置怎么插入数据?然后写入文件中?
查看>>
文档对象模型DOM(二)
查看>>
loading.io一个loading图标网站,跟大家分享
查看>>
Hadoop之——CentOS构造ssh否password登录注意事项
查看>>
云计算的设计模式(三)——补偿交易模式
查看>>
ACM-凸多边形的计算几何——hrbust1429
查看>>
WPF笔记(2.8 常用的布局属性)——Layout
查看>>
MySQL随机获取数据的方法,支持大数据量
查看>>
【Struts】服务器文件的上传和下载
查看>>
UICollectionView设置item(cell)之间间距为0(紧挨在一起的效果)
查看>>
Nginx 负载均衡
查看>>
从 datetime2 数据类型到 datetime 数据类型的转换产生一个超出范围的值
查看>>
创业手记 Mr.Hua
查看>>
SpringMVC之Controller传递JSON数据到页面
查看>>