一:与MR对接
1:官方案例
1.1:读取Hbase数据
自己在Hadoop102,103,104三台机器上输入
export HADOOP_CLASSPATH=$HADOOOP_CLASSPATH:/opt/module/hbase/lib/*
然后在hbase目录下使用如下命令,这个是Hadoop读取hbase中的表数据,表名为student
/opt/module/hadoop-3.1.3/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
1.2:案例二:使用 MapReduce将本地数据导入到 Hbase
新建tsv格式的文件
1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow
创建hbase表
Hbase (main):001:0> create 'fruit','info'
创建input_fruit文件夹并上传fruit.tsv文件
$ opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/ $ opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
执行mapreduce到hbase的fruit表
$ opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=Hbase _ROW_ info:name,info:color fruit hdfs://hadoop:102:9000 /input_fruit
使用命令查看导入后的结果
Hbase (main):001:0> scan 'fruit'
2:自定义案例
2.1:目标:将hdfs里面的数据写入到hbase的表中。
mapper package com.atguigu.mr1; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key,value); } } reducer package com.atguigu.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class FruitReducer extends TableReducer { String cf1 = null; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); cf1 = configuration.get("cf1"); } @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { //1.遍历values for (Text value : values) { //2.获取每一行数据 String[] fields = value.toString().split("t"); //3.构建put对象 Put put = new Put(Bytes.toBytes(fields[0])); //4.给put赋值 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2])); //5.写出 context.write(NullWritable.get(),put); } } } driver package com.atguigu.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FruitDriver implements Tool { //定义一个Configuration private Configuration configuration = null; public int run(String[] args) throws Exception { //1.获取job对象 Job job = Job.getInstance(configuration); //2.设置驱动类路径 job.setJarByClass(FruitDriver.class); //3.设置mapper输出的kv类型 job.setMapperClass(FruitMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //4.设置reduce类 TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class,job); //5.设置输入参数 FileInputFormat.setInputPaths(job,new Path(args[0])); //6.提交任务 boolean result = job.waitForCompletion(true); return result ? 0:1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); ToolRunner.run(configuration,new FruitDriver(),args); } }
执行运行jar包
yarn jar hbase-demo-1.0-SNAPSHOT.jar com.atguigu.mr1.FruitDriver /fruit.tsv fruit1
结果检查
2.2:从hbase读数据然后写入到hbase里面
mapper package com.atguigu.mr2; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class FruitMapper extends TableMapper{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //构建put对象 Put put = new Put(key.get()); //1.获取数据 for (Cell cell : value.rawCells()) { //2.判断当前的cell是否为"name" if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //3.给put对象赋值 put.add(cell); } } //4.写出 context.write(key,put); } } reducer package com.atguigu.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class FruitReducer extends TableReducer { @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { //遍历写出 for (Put value : values) { context.write(NullWritable.get(),value); } } } driver package com.atguigu.mr2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FruirDriver2 implements Tool { //定义配置信息 private Configuration configuration = null; public int run(String[] args) throws Exception { //1.获取job对象 Job job = Job.getInstance(configuration); //2.设置主类路径 job.setJarByClass(FruirDriver2.class); //3.设置mapper输出kv类型 TableMapReduceUtil.initTableMapperJob(args[0],new Scan(),FruitMapper.class, ImmutableBytesWritable.class, Put.class,job); //4.设置reducer输出kv的表 TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class,job); //5.提交任务 boolean result = job.waitForCompletion(true); return result ? 0 :1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; } public static void main(String[] args) throws Exception { //Configuration configuration = new Configuration(); Configuration configuration = HbaseConfiguration.create(); ToolRunner.run(configuration,new FruirDriver2(),args); } }
二:与hive对接
1:Hive
1.1数据仓库
Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用 HQL去管理查询。
1.2用于数据分析、清洗
Hive适用于离线的数据分析和清洗,延迟较高。
1.3基于 HDFS、 MapReduce
Hive存储的数据依旧在 DataNode上,编写的 HQL语句终将是转换为 MapReduce代码执行。
2:Hbase
2.1数据库
是一种面向列族存储的非关系型数据库。
2.2用于存储结构化和非结构化的数据
适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。
2.3基于 HDFS
数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以 region的形式进行管理。
2.4延迟较低,接入在线业务使用
面对大量的企业数据,Hbase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。
3:建立hive表,关联hbase表,插入数据到hive表的同时能影响hbase表
CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:co
mm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
4:在hive中创建临时中间表,用于load文件中的数据,不能将数据直接load进hive所关联的hbase那张表中。
CREATE TABLE emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) row format delimited fields terminated by ' t';
5:向hive中间表中load数据
hive> load data local inpath '/home/admin/softwares/data emp.txt' into table emp;
6:通过insert命令将中间表中的数据导入到hive关联hbase的那张表中
hive> insert into table hive_ hbase _emp_table select * from emp;
7:查看hive以及关联的hbase表中是否已经成功的同步插入了数据
hive hive> select from hive_ h base _emp_table; hbase Hbase > scan 'hbase _emp_table'



