一.依赖
org.apache.hadoop hadoop-mapreduce-client-core2.7.2 org.apache.hadoop hadoop-common2.7.2 org.apache.hadoop hadoop-mapreduce-client-common2.7.2 org.apache.hadoop hadoop-hdfs2.7.2 log4j log4j1.2.17 mysql mysql-connector-java8.0.25
二.对象
在MapReduce中调用MySQL的数据需要完成已经封装好的接口DBWritable,篇幅问题就不把大多数编译器可以自动生成的代码写出来了.
需要注意的是:MySQL中的数据,从excel,csv,或者其他地方导入MySQL时,是以字符串的形式存入的否则会报错(可以参考:Python(pymysql)中Excel表存入MySQL_no_donodie的博客-CSDN博客),所以在定义字段时无论类型全是String,全是String,全是String,重要的事情说三遍.
ackage com.Area;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class Area implements Writable, DBWritable {
private String id;
private String location;
private String money;
private String area;
private String state;
# 无参
# 全参
# get and set
# toString
# 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.id);
dataOutput.writeUTF(this.location);
dataOutput.writeUTF(this.money);
dataOutput.writeUTF(this.area);
dataOutput.writeUTF(this.state);
}
# 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.location = dataInput.readUTF();
this.money = dataInput.readUTF();
this.area = dataInput.readUTF();
this.state = dataInput.readUTF();
}
# 写入数据
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, this.id);
preparedStatement.setString(2, this.location);
preparedStatement.setString(3, this.money);
preparedStatement.setString(4, this.area);
preparedStatement.setString(5,this.state);
}
# 读取数据
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.id = resultSet.getString(1);
this.location = resultSet.getString(2);
this.money = resultSet.getString(3);
this.area = resultSet.getString(4);
this.state = resultSet.getString(5);
}
}
三.Mapper
需要注意的是,读取MySQL中的数据不需要如同csv,excel中的一样切片,读取MySQL中的数据不需要切片,直接用get方法取出来然后捏组就可以了
package com.Area; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class areaMapper extends Mapper{ @Override protected void map(LongWritable key, Area value, Mapper .Context context) throws IOException, InterruptedException { Text text = new Text(); text.set(value.getLocation()); Text text2 = new Text(); text2.set(value.getArea()); context.write(text, text2); } }
四. Drive
Partitioner(分区)与Reduce是不被影响的,正常写就行,这里就不再赘述,直接上Drive.
package com.Area;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
# 读取csv文件或其他文本时才会用到的包
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class areaDriver {
public static String driverClass = "com.mysql.cj.jdbc.Driver";
public static String dbUrl = "jdbc:mysql://localhost:3306/数据库的名称";
public static String userName = "root";
public static String passWord = "MySQL的密码";
public static String tableName = "表名";
# 字段
public static String[] fields = {"id", "location", "money", "area", "state"};
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, driverClass,dbUrl,userName,passWord);
Job job = Job.getInstance(conf,areaDriver.class.getSimpleName());
job.setMapperClass(areaMapper.class);
job.setReducerClass(areaReduce.class);
job.setJarByClass(areaDriver.class);
job.setPartitionerClass(areaPart.class);
# 分几个区就有几个Reduce
job.setNumReduceTasks(4);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
DBInputFormat.setInput(job, Area.class, tableName,null,"id",fields);
# 输出路径,idea可以在右上角设置
FileOutputFormat.setOutputPath(job, new Path(args[0]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 1 : 0);
}
}
设置输出路径:



