栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

6.1.8、Hbase

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

6.1.8、Hbase

1、MapReduce可以操作Hbase,通过Java写Mapreduce,打包在Hadoop上运行

每个map对应一个region,不能直接对hdfs切片,部分数据在memstore中,需要全表扫描,使用scan来获取数据 k:row key v:result一条数据的所有信息
不能使用TextInputforamt读取数据,只能使用TableInputFormat连接得到数据
数据写hdfs上使用TextOutputformat,写回hbase就是用Tableoutputformat

2、导包 org.apache.hbase hbase-server 1.4.6 导入插件,不导插件,Java写的Mr无法运行,识别不出来pox.xml里插件文件,直接读取Hbase数据
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.1
            
                1.8
                1.8
            
        


        
        
            maven-assembly-plugin
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

3、示例

统计各个班级人数,数据写到hdfs
//3、设置map任务,使用TableMapReduceUtil工具类
//因为输入的数据是hbase的,需要配置扫描表,字段等信息
//new Scan 还可以加过滤条件等信息

//求每个班级的人数,结果输出到hdfs上面
public class Demo01MrRead {
    
    public static class MrMap extends TableMapper {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //参数为:key:row key   value:每一条数据结果
            String row_key = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    
    public static class MrReduce extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            //参数与mapreduce一样了,map阶段输入kv,最终输出kv
            //key:map阶段传入的key-calzz value:合并的班级人数
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    
    public static void main(String[] args) throws Exception {
        //也可以直接new Configuration
        Configuration conf = HbaseConfiguration.create();
        //也可以不设置
        //1、创建一个Job
        Job job = Job.getInstance(conf);
        job.setJobName("Hbase_mr1学生班级人数");
        //2、设置Job的Jar
        job.setJarByClass(Demo01MrRead.class);
        //3、设置map任务,使用TableMapReduceUtil工具类
        //因为输入的数据是hbase的,需要配置扫描表,字段等信息
        //new Scan 还可以加过滤条件等信息
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students"),
                new Scan(), MrMap.class, Text.class, IntWritable.class, job);
        //4、设置reduce任务
        job.setReducerClass(MrReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5、设置输入输出路径
        FileOutputFormat.setOutputPath(job, new Path("/hbaseMr/clazz_num"));
        //6、执行
        job.waitForCompletion(true);
    }
}

统计各个班级人数,数据写回hbase–建表–使用TablemapreduceUtil指定输出路径为Hbase表

//统计每个班级人数,结果存入hbase表中
public class Demo02MrRead {
    public static class MrMapper extends TableMapper {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    public static class MrReduce extends TableReducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();
            }
            //以班级key作为row key
            Put put = new Put(key.getBytes());
            put.addColumn("info".getBytes(), "num".getBytes(), Bytes.toBytes(count));
            context.write(NullWritable.get(), put);
        }
    }

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = HbaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");
            Job job = Job.getInstance(conf);
            job.setJarByClass(Demo02MrRead.class);

            TableMapReduceUtil.initTableMapperJob("students",
                    new Scan(), MrMapper.class,
                    Text.class,
                    IntWritable.class,
                    job);


            TableMapReduceUtil.initTableReducerJob(
                    "mr_res",
                    MrReduce.class,
                    job
            );

            job.waitForCompletion(true);
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/332636.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号