- 一、 MapReduce 实体类
- 二、MapReduce DBWriterMapper类(写入数据库操作)
- 2.MapReduce DBWriterReduce类(写数据库操作)
- MapReduce DBWriterDriver类(写入数据库操作驱动类)
- MapReduce 数据库表
提示:以下是本篇文章正文内容,下面案例可供参考
一、 MapReduce 实体类读取数据库操作,首先创建实体类。
实体类需要继承 Writable,DBWritable两个类。 重写Writable的 write(序列化方法),readFields(反序列化方法)方法。重写 DBWritable的 write(序列化方法),readFields(反序列化方法)。
两个继承类的重写的方法名一样 但是参数不同。
下面是GoodsBean实体类:
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class GoodsBean implements Writable, DBWritable {
private long goodsId;//商品ID
private String goodsSn;//商品编号
private String goodsName;//商品名称
private double marketPrice;//市场价
private double shopPrice;//门店价
private long saleNum;//总销售量
public long getGoodsId() {
return goodsId;
}
public void setGoodsId(long goodsId) {
this.goodsId = goodsId;
}
public String getGoodsSn() {
return goodsSn;
}
public void setGoodsSn(String goodsSn) {
this.goodsSn = goodsSn;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public double getMarketPrice() {
return marketPrice;
}
public void setMarketPrice(double marketPrice) {
this.marketPrice = marketPrice;
}
public double getShopPrice() {
return shopPrice;
}
public void setShopPrice(double shopPrice) {
this.shopPrice = shopPrice;
}
public long getSaleNum() {
return saleNum;
}
public void setSaleNum(long saleNum) {
this.saleNum = saleNum;
}
public GoodsBean() {
}
public GoodsBean(long goodsId, String goodsSn, String goodsName, double marketPrice, double shopPrice, long saleNum) {
this.goodsId = goodsId;
this.goodsSn = goodsSn;
this.goodsName = goodsName;
this.marketPrice = marketPrice;
this.shopPrice = shopPrice;
this.saleNum = saleNum;
}
public void set(long goodsId, String goodsSn, String goodsName, double marketPrice, double shopPrice, long saleNum) {
this.goodsId = goodsId;
this.goodsSn = goodsSn;
this.goodsName = goodsName;
this.marketPrice = marketPrice;
this.shopPrice = shopPrice;
this.saleNum = saleNum;
}
//此处省略setter getter方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(goodsId);
out.writeUTF(goodsSn);
out.writeUTF(goodsName);
out.writeDouble(marketPrice);
out.writeDouble(shopPrice);
out.writeLong(saleNum);
}
@Override
public void readFields(DataInput in) throws IOException {
this.goodsId = in.readLong();
this.goodsSn = in.readUTF();
this.goodsName = in.readUTF();
this.marketPrice = in.readDouble();
this.shopPrice = in.readDouble();
this.saleNum = in.readLong();
}
@Override
public void write(PreparedStatement ps) throws SQLException {
ps.setLong(1, goodsId);
ps.setString(2, goodsSn);
ps.setString(3, goodsName);
ps.setDouble(4, marketPrice);
ps.setDouble(5, shopPrice);
ps.setLong(6, saleNum);
}
@Override
public void readFields(ResultSet rs) throws SQLException {
this.goodsId = rs.getLong(1);
this.goodsSn = rs.getString(2);
this.goodsName = rs.getString(3);
this.marketPrice = rs.getDouble(4);
this.shopPrice = rs.getDouble(5);
this.saleNum = rs.getLong(6);
}
@Override
public String toString() {
return goodsId + "t" + goodsSn + "t" + goodsName + "t" + marketPrice + "t" + shopPrice + "t" + saleNum;
}
}
二、MapReduce DBWriterMapper类(写入数据库操作)
此类须先读取文档中的数据,然后将文档的数据写入到数据库当中
该类需要继承Mapper
KeyIn:一般为行数坐标 LongWritable
ValueIn :为读取文档的行数据 Text
KeyOut:输出的key NullWritable、
ValueOut:输出的值,将文档的内容写入数据库当中 所以为 GoodsBean
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class readDBMapper extends Mapper2.MapReduce DBWriterReduce类(写数据库操作){ NullWritable outKey = NullWritable.get(); GoodsBean outValue = new GoodsBean(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //获取两个计数器 用于统计 不需要可以不写 Counter sc = context.getCounter("mr_to_sql", "SUCCESS"); Counter fa = context.getCounter("mr_to_sql", "FAILED"); String[] split = value.toString().split("t"); //判断输入字段是否有缺失 如果不满足就是非法数据 if (split.length > 6) { //合法数据 提取字段 sc.increment(1); outValue.set(Long.parseLong(split[1]),split[2],split[3],Double.parseDouble(split[4]),Double.parseDouble(split[5]),Long.parseLong(split[6])); context.write(outKey,outValue); } else { //非法字段 fa.increment(1); } } }
KeyIn:map输出的key NullWritable
ValueIn:map输出的值,将文档的内容写入数据库当中 所以为 GoodsBean
KeyOut:GoodsBean
ValueOut: NullWritable
此处输出的key和value ,为什么key为实体类,value为空的 ?具体原因如下
下图为DBOutputFormat源码的注释 ,该方法声明:只会将key的值写入到数据库,因此 只能将实体类写入到key value变为空
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class readDBReduce extends ReducerMapReduce DBWriterDriver类(写入数据库操作驱动类){ @Override protected void reduce(NullWritable key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { for (GoodsBean gb: values) { context.write(gb,NullWritable.get()); } } }
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class readDBDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost/shop?serverTimezone=GMT%2B8&Unicode=true&characterEncoding=utf-8&useSSL=false", "root", "123456");
//创建作业的job类
Job job = Job.getInstance(conf, readDBDriver.class.getSimpleName());
//驱动类
job.setJarByClass(readDBDriver.class);
//mapper reduce 类
job.setMapperClass(readDBMapper.class);
job.setReducerClass(readDBReduce.class);
//mapper 输出类
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(GoodsBean.class);
//reduce输出类
job.setOutputKeyClass(GoodsBean.class);
job.setOutputValueClass(NullWritable.class);
//设置输入文件路径 (此处使用的文件为 mapreduce生成的文件,可以不需要指定文件的名字,只要到文件所在当前的文件夹位置就行哈)
FileInputFormat.setInputPaths(job,new Path("E:\mysqlOut"));
//设置程序的输出类
job.setOutputFormatClass(DBOutputFormat.class);
//配置当前作业写入数据库的表,字段
DBOutputFormat.setOutput(job,"goods_mr","goodsId","goodsSn","goodsName","marketPrice","shopPrice","saleNum");
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
MapReduce 数据库表
可以从下面链接获取数据库表结构
所要读取的文件 可以使用在上一篇使用代码生成的文件
下一篇: 读取数据库操作链接.



