栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce 读取MySQL数据到文件

MapReduce 读取MySQL数据到文件

MapReduce 对mysql进行操作 了解实现方式 1.创建方法类实现DBWritable,Writable接口

DBWritable接口中实现两个方法

//DBWritable主要是实现对数据库读写操作 所以输出格式是PreparedStatement
//PreparedStatement接口继承并扩展了Statement接口,用来执行动态的SQL语句,即包含参数的
public void write(PreparedStatement stmt ) 	thorws SQLException{
    
}
//DBWritable主要是实现对数据库读写操作 所以输入格式是ResultSet 
//ResultSet接口类似于一张数据表,用来暂时存放从数据库查询操作所获得的结果集
public void readFields(ResultSet result) thorws SQLException{
    
}

Writable接口实现两个方法

//Writable接口是对数据流进行操作的,所以输出是DataOutput类对象 
public void write(DataOutput out) throws IOException {
    
}
//Writable接口是对数据流进行操作的,所以输入是DataInput类对象
public void readFields(DataInput in) throws IOException {
    
}

建立关系数据库连接

//提供数据库配置和创建连接的接口 
//DBConfiguration类中提供了一个静态方法创建数据库连接:
//configureDB((Job job,String driverClass,String dbUrl,String userName,String Password))
//job为当前准备执行的作业,driverClasss为数据库厂商提供的访问其数据库的驱动程序,dbUrl为运行数据库的主机的地址,userName和password分别为数据库提供访问地用户名和相应的访问密码。
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.30.59.136:3306/world", "root", "123456");

相应的从关系数据库查询和读取数据的接口

  • DBInputFormat:提供从数据库读取数据的格式。
  • DBRecordReader:提供读取数据记录的接口。
 DBInputFormat.setInput(job, Stu.class, "stu", null, "student_id", fileds1);

相应的向关系数据库直接输出结果的编程接口

  • DBOutputFormat:提供向数据库输出数据的格式。
  • DBRecordWrite:提供数据库写入数据记录的接口。

数据库连接完成后,即可完成从MapReduce程序向关系数据库写入数据的操作。为了告知数据库将写入哪个表中的哪些字段,DBOutputFormat中提供了一个静态方法来指定需要写入的数据表和字段:

		setOutput(Job job,String tableName,String ... fieldName)
    //tableName指定即将写入的数据表,后续参数将指定哪些字段数据将写入该表。
2.创建MySQL数据表

1.创建数据库

create database world;

2.创建数据表并导入数据

CREATE TABLE stu  (
  `id` int(11) NOT NULL,
  `student_id` int(11) NULL DEFAULT NULL,
  `name` varchar(255) ,
  `age` int(11) ,
  `sex` varchar(255) ,
  `birthday` varchar(255) ,
) 
INSERT INTO `stu` VALUES (1, 1, 'zs', 12, '1', '2019-07-17');
INSERT INTO `stu` VALUES (2, 2, 'goudan', 13, '1', '2019-07-17');
INSERT INTO `stu` VALUES (3, 3, 'gg', 12, '1', '2019-07-17');
INSERT INTO `stu` VALUES (4, 4, 'hh', 12, '1', '2019-07-17');
INSERT INTO `stu` VALUES (5, 5, 'mm', 13, '1', '2019-07-17');
INSERT INTO `stu` VALUES (6, 6, 'tt', 12, '1', '2019-07-17');

任务需求:统计每个年龄的有多少人 代码片段
public class MySQLTest {
    public static class MyMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Stu value, Context context) throws IOException, InterruptedException {
            context.write(new IntWritable(value.getAge()), new IntWritable(1));
        }

    }

    public static class MyReducer extends Reducer {
        @Override
        protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable iw : values) {
                count += iw.get();
            }
            Text text = new Text();
            text.set(key.get() + "岁的有" + count + "人");
            context.write(text, NullWritable.get());
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        BasicConfigurator.configure();
        Configuration conf = new Configuration();

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.30.59.136:3306/world", "root", "123456");
        Job job = Job.getInstance(conf, "mr");
        //设置驱动类
        job.setJarByClass(MySQLTest.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        //设置输入输出类型
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //设置数据输入输出配置
        String[] fileds1 = {"id", "student_id", "name", "age", "sex", "birthday"};
        DBInputFormat.setInput(job, Stu.class, "stu", null, "student_id", fileds1);
        Path outpath = new Path("D:\MapReduce_test\from_Mysql_To_HDFS\outPut\");
        FileSystem fileSystem = outpath.getFileSystem(conf);
        if (fileSystem.exists(outpath)) {
            fileSystem.delete(outpath, true);
        }
        FileOutputFormat.setOutputPath(job, outpath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

public class Stu implements DBWritable, Writable {
    private int id;
    private int student;
    private String name;
    private int age;
    private String sex;
    private String birthday;
    //Writable接口是对数据流进行操作的,所以输出是DataOutput类对象
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.id);
        out.writeInt(this.student);
        out.writeUTF(this.name);
        out.writeInt(this.age);
        out.writeUTF(this.sex);
        out.writeUTF(this.birthday);
    }
    //Writable接口是对数据流进行操作的,所以输入是DataInput类对象
    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readInt();
        this.student = in.readInt();
        this.name = in.readUTF();
        this.age = in.readInt();
        this.sex = in.readUTF();
        this.birthday = in.readUTF();
    }
    
    @Override
    public void write(PreparedStatement statement) throws SQLException {
		//因为我们不需要写入到操作,里面填空就好
       //如果需要一个类即能从数据库读又能写入数据库,则需要同时实现write和readFields两个方法。
    }
    
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        this.id = resultSet.getInt(1);
        this.student = resultSet.getInt(2);
        this.name = resultSet.getString(3);
        this.age = resultSet.getInt(4);
        this.sex = resultSet.getString(5);
        this.birthday = resultSet.getString(6);
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getStudent() {
        return student;
    }

    public void setStudent(int student) {
        this.student = student;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public String getBirthday() {
        return birthday;
    }

    public void setBirthday(String birthday) {
        this.birthday = birthday;
    }

    public int getStudent_id() {
        return student;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public String getSex() {
        return sex;
    }
}

清洗结果

12岁的有4人
13岁的有2人
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/304583.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号