栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce 写入数据库操作

MapReduce 写入数据库操作

MapReduce 写入数据库操作
  • 一、 MapReduce 实体类
  • 二、MapReduce DBWriterMapper类(写入数据库操作)
    • 2.MapReduce DBWriterReduce类(写数据库操作)
    • MapReduce DBWriterDriver类(写入数据库操作驱动类)
    • MapReduce 数据库表


提示:以下是本篇文章正文内容,下面案例可供参考

一、 MapReduce 实体类

读取数据库操作,首先创建实体类。
实体类需要继承 Writable,DBWritable两个类。 重写Writable的 write(序列化方法),readFields(反序列化方法)方法。重写 DBWritable的 write(序列化方法),readFields(反序列化方法)。
两个继承类的重写的方法名一样 但是参数不同。
下面是GoodsBean实体类:

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class GoodsBean implements Writable, DBWritable {


    private long goodsId;//商品ID
    private String goodsSn;//商品编号
    private String goodsName;//商品名称
    private double marketPrice;//市场价
    private double shopPrice;//门店价
    private long saleNum;//总销售量


    public long getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(long goodsId) {
        this.goodsId = goodsId;
    }

    public String getGoodsSn() {
        return goodsSn;
    }

    public void setGoodsSn(String goodsSn) {
        this.goodsSn = goodsSn;
    }

    public String getGoodsName() {
        return goodsName;
    }

    public void setGoodsName(String goodsName) {
        this.goodsName = goodsName;
    }

    public double getMarketPrice() {
        return marketPrice;
    }

    public void setMarketPrice(double marketPrice) {
        this.marketPrice = marketPrice;
    }

    public double getShopPrice() {
        return shopPrice;
    }

    public void setShopPrice(double shopPrice) {
        this.shopPrice = shopPrice;
    }

    public long getSaleNum() {
        return saleNum;
    }

    public void setSaleNum(long saleNum) {
        this.saleNum = saleNum;
    }

    
    public GoodsBean() {
    }

    
    public GoodsBean(long goodsId, String goodsSn, String goodsName, double marketPrice, double shopPrice, long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    
    public void set(long goodsId, String goodsSn, String goodsName, double marketPrice, double shopPrice, long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    //此处省略setter getter方法

    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(goodsId);
        out.writeUTF(goodsSn);
        out.writeUTF(goodsName);
        out.writeDouble(marketPrice);
        out.writeDouble(shopPrice);
        out.writeLong(saleNum);
    }

    
    @Override
    public void readFields(DataInput in) throws IOException {
        this.goodsId = in.readLong();
        this.goodsSn = in.readUTF();
        this.goodsName = in.readUTF();
        this.marketPrice = in.readDouble();
        this.shopPrice = in.readDouble();
        this.saleNum = in.readLong();
    }

    
    @Override
    public void write(PreparedStatement ps) throws SQLException {
        ps.setLong(1, goodsId);
        ps.setString(2, goodsSn);
        ps.setString(3, goodsName);
        ps.setDouble(4, marketPrice);
        ps.setDouble(5, shopPrice);
        ps.setLong(6, saleNum);
    }

    
    @Override
    public void readFields(ResultSet rs) throws SQLException {
        this.goodsId = rs.getLong(1);
        this.goodsSn = rs.getString(2);
        this.goodsName = rs.getString(3);
        this.marketPrice = rs.getDouble(4);
        this.shopPrice = rs.getDouble(5);
        this.saleNum = rs.getLong(6);
    }

    @Override
    public String toString() {
        return goodsId + "t" + goodsSn + "t" + goodsName + "t" + marketPrice + "t" + shopPrice + "t" + saleNum;
    }

}

二、MapReduce DBWriterMapper类(写入数据库操作)

此类须先读取文档中的数据,然后将文档的数据写入到数据库当中

该类需要继承Mapper重写map方法

KeyIn:一般为行数坐标 LongWritable
ValueIn :为读取文档的行数据 Text
KeyOut:输出的key NullWritable、
ValueOut:输出的值,将文档的内容写入数据库当中 所以为 GoodsBean

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class readDBMapper extends Mapper {


    NullWritable outKey = NullWritable.get();
    GoodsBean outValue = new GoodsBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {

        //获取两个计数器  用于统计 不需要可以不写
        Counter sc = context.getCounter("mr_to_sql", "SUCCESS");
        Counter fa = context.getCounter("mr_to_sql", "FAILED");


        String[] split = value.toString().split("t");

        //判断输入字段是否有缺失 如果不满足就是非法数据

        if (split.length > 6) {
            //合法数据 提取字段
            sc.increment(1);
            outValue.set(Long.parseLong(split[1]),split[2],split[3],Double.parseDouble(split[4]),Double.parseDouble(split[5]),Long.parseLong(split[6]));
            context.write(outKey,outValue);
        } else {
            //非法字段
            fa.increment(1);
        }


    }
}
2.MapReduce DBWriterReduce类(写数据库操作)

KeyIn:map输出的key NullWritable
ValueIn:map输出的值,将文档的内容写入数据库当中 所以为 GoodsBean
KeyOut:GoodsBean
ValueOut: NullWritable

此处输出的key和value ,为什么key为实体类,value为空的 ?具体原因如下

下图为DBOutputFormat源码的注释 ,该方法声明:只会将key的值写入到数据库,因此 只能将实体类写入到key value变为空

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class readDBReduce extends Reducer {

    @Override
    protected void reduce(NullWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        for (GoodsBean gb: values) {

            context.write(gb,NullWritable.get());

        }
    }
}
MapReduce DBWriterDriver类(写入数据库操作驱动类)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class readDBDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        DBConfiguration.configureDB(conf, "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost/shop?serverTimezone=GMT%2B8&Unicode=true&characterEncoding=utf-8&useSSL=false", "root", "123456");

        //创建作业的job类
        Job job = Job.getInstance(conf, readDBDriver.class.getSimpleName());

        //驱动类
        job.setJarByClass(readDBDriver.class);
        //mapper   reduce  类
        job.setMapperClass(readDBMapper.class);
        job.setReducerClass(readDBReduce.class);
        //mapper 输出类
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(GoodsBean.class);
        //reduce输出类
        job.setOutputKeyClass(GoodsBean.class);
        job.setOutputValueClass(NullWritable.class);


        //设置输入文件路径 (此处使用的文件为 mapreduce生成的文件,可以不需要指定文件的名字,只要到文件所在当前的文件夹位置就行哈)
        FileInputFormat.setInputPaths(job,new Path("E:\mysqlOut"));
        //设置程序的输出类
        job.setOutputFormatClass(DBOutputFormat.class);
        //配置当前作业写入数据库的表,字段
        DBOutputFormat.setOutput(job,"goods_mr","goodsId","goodsSn","goodsName","marketPrice","shopPrice","saleNum");

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

MapReduce 数据库表

可以从下面链接获取数据库表结构
所要读取的文件 可以使用在上一篇使用代码生成的文件

下一篇: 读取数据库操作链接.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342343.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号