栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hbase-day04-与MR和hive对接

Hbase-day04-与MR和hive对接

一:与MR对接

1:官方案例

        1.1:读取Hbase数据

自己在Hadoop102,103,104三台机器上输入

export HADOOP_CLASSPATH=$HADOOOP_CLASSPATH:/opt/module/hbase/lib/*

然后在hbase目录下使用如下命令,这个是Hadoop读取hbase中的表数据,表名为student

/opt/module/hadoop-3.1.3/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

        1.2:案例二:使用 MapReduce将本地数据导入到 Hbase

新建tsv格式的文件

1001    Apple    Red
1002    Pear    Yellow
1003    Pineapple    Yellow

创建hbase表

Hbase (main):001:0> create 'fruit','info'

创建input_fruit文件夹并上传fruit.tsv文件

$ opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/

执行mapreduce到hbase的fruit表

$ opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv 
-Dimporttsv.columns=Hbase _ROW_ info:name,info:color fruit hdfs://hadoop:102:9000 /input_fruit

 使用命令查看导入后的结果

Hbase (main):001:0> scan 'fruit'

2:自定义案例

        2.1:目标:将hdfs里面的数据写入到hbase的表中。

mapper

package com.atguigu.mr1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FruitMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(key,value);
    }
}


reducer

package com.atguigu.mr1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class FruitReducer extends TableReducer {

    String cf1 = null;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration configuration = context.getConfiguration();

        cf1 = configuration.get("cf1");
    }

    @Override
    protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        //1.遍历values
        for (Text value : values) {
            //2.获取每一行数据
            String[] fields = value.toString().split("t");

            //3.构建put对象
            Put put = new Put(Bytes.toBytes(fields[0]));

            //4.给put赋值
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));

            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));

            //5.写出
            context.write(NullWritable.get(),put);
        }
    }
}


driver

package com.atguigu.mr1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FruitDriver implements Tool {

    //定义一个Configuration
    private Configuration configuration = null;

    public int run(String[] args) throws Exception {

        //1.获取job对象
        Job job = Job.getInstance(configuration);

        //2.设置驱动类路径
        job.setJarByClass(FruitDriver.class);

        //3.设置mapper输出的kv类型
        job.setMapperClass(FruitMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        //4.设置reduce类
        TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class,job);

        //5.设置输入参数
        FileInputFormat.setInputPaths(job,new Path(args[0]));

        //6.提交任务
        boolean result = job.waitForCompletion(true);

        return result ? 0:1;
    }

    public void setConf(Configuration conf) {
        configuration = conf;
    }

    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) throws Exception {

        Configuration configuration = new Configuration();

        ToolRunner.run(configuration,new FruitDriver(),args);

    }
}

执行运行jar包

yarn jar hbase-demo-1.0-SNAPSHOT.jar com.atguigu.mr1.FruitDriver /fruit.tsv fruit1

结果检查 

        2.2:从hbase读数据然后写入到hbase里面

mapper

package com.atguigu.mr2;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class FruitMapper extends TableMapper {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //构建put对象
        Put put = new Put(key.get());

        //1.获取数据
        for (Cell cell : value.rawCells()) {

            //2.判断当前的cell是否为"name"
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){

                //3.给put对象赋值
                put.add(cell);
            }
        }

        //4.写出
        context.write(key,put);

    }

}


reducer

package com.atguigu.mr2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class FruitReducer extends TableReducer {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        //遍历写出
        for (Put value : values) {
            context.write(NullWritable.get(),value);
        }

    }
}


driver

package com.atguigu.mr2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FruirDriver2 implements Tool {

    //定义配置信息
    private Configuration configuration = null;

    public int run(String[] args) throws Exception {

        //1.获取job对象
        Job job = Job.getInstance(configuration);
        //2.设置主类路径
        job.setJarByClass(FruirDriver2.class);
        //3.设置mapper输出kv类型
        TableMapReduceUtil.initTableMapperJob(args[0],new Scan(),FruitMapper.class, ImmutableBytesWritable.class, Put.class,job);
        //4.设置reducer输出kv的表
        TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class,job);
        //5.提交任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 :1;
    }

    public void setConf(Configuration conf) {
        configuration = conf;
    }

    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) throws Exception {
        //Configuration configuration = new Configuration();
        Configuration configuration = HbaseConfiguration.create();
        ToolRunner.run(configuration,new FruirDriver2(),args);
    }
}

二:与hive对接

1:Hive
        1.1数据仓库
        Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用 HQL去管理查询。
        1.2用于数据分析、清洗
        Hive适用于离线的数据分析和清洗,延迟较高。
        1.3基于 HDFS、 MapReduce
        Hive存储的数据依旧在 DataNode上,编写的 HQL语句终将是转换为 MapReduce代码执行。
2:Hbase
        2.1数据库
        是一种面向列族存储的非关系型数据库。
        2.2用于存储结构化和非结构化的数据
        适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。
        2.3基于 HDFS
        数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以 region的形式进行管理。
        2.4延迟较低,接入在线业务使用
        面对大量的企业数据,Hbase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。

3:建立hive表,关联hbase表,插入数据到hive表的同时能影响hbase表

CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:co
mm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");

4:在hive中创建临时中间表,用于load文件中的数据,不能将数据直接load进hive所关联的hbase那张表中。

CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by ' t';

5:向hive中间表中load数据

hive> load data local inpath '/home/admin/softwares/data emp.txt' into table emp;

6:通过insert命令将中间表中的数据导入到hive关联hbase的那张表中

hive> insert into table hive_ hbase _emp_table select * from emp;

7:查看hive以及关联的hbase表中是否已经成功的同步插入了数据

hive

hive> select from hive_ h base _emp_table;

hbase

Hbase > scan 'hbase _emp_table'

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612933.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号