DBWritable接口中实现两个方法
//DBWritable主要是实现对数据库读写操作 所以输出格式是PreparedStatement
//PreparedStatement接口继承并扩展了Statement接口,用来执行动态的SQL语句,即包含参数的
public void write(PreparedStatement stmt ) thorws SQLException{
}
//DBWritable主要是实现对数据库读写操作 所以输入格式是ResultSet
//ResultSet接口类似于一张数据表,用来暂时存放从数据库查询操作所获得的结果集
public void readFields(ResultSet result) thorws SQLException{
}
Writable接口实现两个方法
//Writable接口是对数据流进行操作的,所以输出是DataOutput类对象
public void write(DataOutput out) throws IOException {
}
//Writable接口是对数据流进行操作的,所以输入是DataInput类对象
public void readFields(DataInput in) throws IOException {
}
建立关系数据库连接
//提供数据库配置和创建连接的接口 //DBConfiguration类中提供了一个静态方法创建数据库连接: //configureDB((Job job,String driverClass,String dbUrl,String userName,String Password)) //job为当前准备执行的作业,driverClasss为数据库厂商提供的访问其数据库的驱动程序,dbUrl为运行数据库的主机的地址,userName和password分别为数据库提供访问地用户名和相应的访问密码。 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.30.59.136:3306/world", "root", "123456");
相应的从关系数据库查询和读取数据的接口
- DBInputFormat:提供从数据库读取数据的格式。
- DBRecordReader:提供读取数据记录的接口。
DBInputFormat.setInput(job, Stu.class, "stu", null, "student_id", fileds1);
相应的向关系数据库直接输出结果的编程接口
- DBOutputFormat:提供向数据库输出数据的格式。
- DBRecordWrite:提供数据库写入数据记录的接口。
数据库连接完成后,即可完成从MapReduce程序向关系数据库写入数据的操作。为了告知数据库将写入哪个表中的哪些字段,DBOutputFormat中提供了一个静态方法来指定需要写入的数据表和字段:
setOutput(Job job,String tableName,String ... fieldName)
//tableName指定即将写入的数据表,后续参数将指定哪些字段数据将写入该表。
2.创建MySQL数据表
1.创建数据库
create database world;
2.创建数据表并导入数据
CREATE TABLE stu ( `id` int(11) NOT NULL, `student_id` int(11) NULL DEFAULT NULL, `name` varchar(255) , `age` int(11) , `sex` varchar(255) , `birthday` varchar(255) , ) INSERT INTO `stu` VALUES (1, 1, 'zs', 12, '1', '2019-07-17'); INSERT INTO `stu` VALUES (2, 2, 'goudan', 13, '1', '2019-07-17'); INSERT INTO `stu` VALUES (3, 3, 'gg', 12, '1', '2019-07-17'); INSERT INTO `stu` VALUES (4, 4, 'hh', 12, '1', '2019-07-17'); INSERT INTO `stu` VALUES (5, 5, 'mm', 13, '1', '2019-07-17'); INSERT INTO `stu` VALUES (6, 6, 'tt', 12, '1', '2019-07-17');任务需求:统计每个年龄的有多少人 代码片段
public class MySQLTest {
public static class MyMapper extends Mapper {
@Override
protected void map(LongWritable key, Stu value, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(value.getAge()), new IntWritable(1));
}
}
public static class MyReducer extends Reducer {
@Override
protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable iw : values) {
count += iw.get();
}
Text text = new Text();
text.set(key.get() + "岁的有" + count + "人");
context.write(text, NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
BasicConfigurator.configure();
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.30.59.136:3306/world", "root", "123456");
Job job = Job.getInstance(conf, "mr");
//设置驱动类
job.setJarByClass(MySQLTest.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//设置输入输出类型
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置数据输入输出配置
String[] fileds1 = {"id", "student_id", "name", "age", "sex", "birthday"};
DBInputFormat.setInput(job, Stu.class, "stu", null, "student_id", fileds1);
Path outpath = new Path("D:\MapReduce_test\from_Mysql_To_HDFS\outPut\");
FileSystem fileSystem = outpath.getFileSystem(conf);
if (fileSystem.exists(outpath)) {
fileSystem.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
public class Stu implements DBWritable, Writable {
private int id;
private int student;
private String name;
private int age;
private String sex;
private String birthday;
//Writable接口是对数据流进行操作的,所以输出是DataOutput类对象
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeInt(this.student);
out.writeUTF(this.name);
out.writeInt(this.age);
out.writeUTF(this.sex);
out.writeUTF(this.birthday);
}
//Writable接口是对数据流进行操作的,所以输入是DataInput类对象
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.student = in.readInt();
this.name = in.readUTF();
this.age = in.readInt();
this.sex = in.readUTF();
this.birthday = in.readUTF();
}
@Override
public void write(PreparedStatement statement) throws SQLException {
//因为我们不需要写入到操作,里面填空就好
//如果需要一个类即能从数据库读又能写入数据库,则需要同时实现write和readFields两个方法。
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getInt(1);
this.student = resultSet.getInt(2);
this.name = resultSet.getString(3);
this.age = resultSet.getInt(4);
this.sex = resultSet.getString(5);
this.birthday = resultSet.getString(6);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getStudent() {
return student;
}
public void setStudent(int student) {
this.student = student;
}
public void setName(String name) {
this.name = name;
}
public void setAge(int age) {
this.age = age;
}
public void setSex(String sex) {
this.sex = sex;
}
public String getBirthday() {
return birthday;
}
public void setBirthday(String birthday) {
this.birthday = birthday;
}
public int getStudent_id() {
return student;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
public String getSex() {
return sex;
}
}
清洗结果
12岁的有4人 13岁的有2人



