需求:有一张学生信息表student,统计每个年龄的人数,并输出到statistics
前置工作
两张表创建完成创建一个maven工程导入相关的依赖
org.apache.hadoop hadoop-client 3.1.3 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30 mysql mysql-connector-java 8.0.22
序列化实体类
package com.gzhu.put;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
// 1.实现 DBWritable, Writable
public class MyDBWritable implements DBWritable, Writable {
// 数据库的写入字段
private Long id;
private String name;
private Long age;
// 写出字段
private Long ag;
private Long count;
// 2.反序列化所需要的空参构造
public MyDBWritable(){}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Long getAge() {
return age;
}
public void setAge(Long age) {
this.age = age;
}
public Long getAg() {
return ag;
}
public void setAg(Long ag) {
this.ag = ag;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
// 3.序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(id);
dataOutput.writeUTF(name);
dataOutput.writeLong(age);
}
// 4.反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readLong();
this.name = dataInput.readUTF();
this.age = dataInput.readLong();
}
// 5.从DB读取
@Override
public void readFields(ResultSet resultSet) throws SQLException {
id = resultSet.getLong(1); // 1,2,3对应列的坐标,从1开始
name = resultSet.getString(2);
age = resultSet.getLong(3);
}
// 6.写入数据库
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setLong(1,ag);
preparedStatement.setLong(2,count);
}
}
Mapper
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class JDBCMapper extends Mapper{ private LongWritable longWritable = new LongWritable(); private LongWritable outWritable = new LongWritable(1); @Override protected void map(LongWritable key, MyDBWritable value, Mapper .Context context) throws IOException, InterruptedException { Long age = value.getAge(); // 读取每一行的年龄作为K longWritable.set(age); context.write(longWritable, outWritable); } }
Reduce
public class JDBCReduce extends Reducer{ @Override protected void reduce(LongWritable key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { // 比如K : 21 则此时values是3个1的集合,所以要循环累加 count += value.get(); } MyDBWritable myDBWritable = new MyDBWritable(); myDBWritable.setAg(key.get()); myDBWritable.setCount(count); context.write(myDBWritable,NullWritable.get()); } }
Driver
package com.gzhu.put;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import java.io.IOException;
public class JDBCDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置jar路径
job.setJarByClass(JDBCDriver.class);
// 3.数据输入类型为数据库输入
job.setInputFormatClass(DBInputFormat.class); //read
// 4.设置数据库配置并且连接
String driverClass = "com.mysql.cj.jdbc.Driver";
String url = "jdbc:mysql://localhost:3306/hadoop?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8";
String userName = "root";
String passWord = "zks123456";
DBConfiguration.configureDB(job.getConfiguration(), driverClass, url,
userName, passWord);
// 5.设置数据输入内容-sql查询数据作为输入数据
DBInputFormat.setInput(job, MyDBWritable.class,
"select id,name,age from student",
"select count(*) from student");
// 6.设置输出的表
DBOutputFormat.setOutput(job,"statistics","ag","count");
// 7.关联mapper和reducer
job.setMapperClass(JDBCMapper.class);
job.setReducerClass(JDBCReduce.class);
// 8.设置map输出的kv类型
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
// 9.设置最终输出的kv类型
job.setOutputKeyClass(MyDBWritable.class);
job.setOutputValueClass(NullWritable.class);
// 10.提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}



