栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop期末复习贴-Hbase遇上MapReducer

Hadoop期末复习贴-Hbase遇上MapReducer

若本文对你有帮助,请记得点赞、关注我喔!

2)hbase结合MapReduce

  • hdfs传输文件到hbase
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class FileToHbase extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new FileToHbase(), args);
        System.exit(run);
    }

    
    public static class HDFSToHbaseMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    
    public static class HDFSToHbaseReducer extends TableReducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            String[] split = key.toString().split(",");
            Put put = new Put(split[0].getBytes()); //确定行号
            put.addColumn("basic_info".getBytes(), "name".getBytes(), split[1].getBytes());
            put.addColumn("basic_info".getBytes(), "gender".getBytes(), split[2].getBytes());
            put.addColumn("basic_info".getBytes(), "age".getBytes(), split[3].getBytes());
            put.addColumn("school_info".getBytes(), "major".getBytes(), split[4].getBytes());
            
            context.write(null, put);

        }
    }


    @Override
    public int run(String[] strings) throws Exception {
        
        Configuration conf = HbaseConfiguration.create();
        FileSystem fs = FileSystem.get(conf);
        Job job = Job.getInstance(conf);
        job.setJarByClass(FileToHbase.class);

        job.setMapperClass(HDFSToHbaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        Path inputPath = new Path("hdfs://localhost:9000/student");
        FileInputFormat.addInputPath(job, inputPath);//设置源文件的地址

        
        TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        boolean isDone = job.waitForCompletion(true);
        return isDone ? 0 : 1;
    }
}

2)从HBaes表读取数据并输出到HDFS文件

package HbaseWithMapReducer;


import java.io.IOException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileSystem;

public class HbaseToHdfs extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new HbaseToHdfs(),args);
    }
    public static final byte[] family = "basic_info".getBytes();
    public static final byte[] column = "age".getBytes();

    public static class MyMapper extends TableMapper {

        protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
            String major = Bytes.toString(result.getValue(Bytes.toBytes("school_info"), Bytes.toBytes("major")));
            byte[] value = result.getValue(family, column); //这一行的age
            int age = Integer.parseInt(Bytes.toString(value));//将这一行age转为Int
            context.write(new Text(major), new IntWritable(age));//把这一行读入到context


        }
    }


    public static class MyReducer extends Reducer {
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            int num = 0;
            for (IntWritable value : values) {
                sum += value.get();
                num++;
            }
            context.write(key, new DoubleWritable(sum * 1.0 / num));
        }
    }
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = HbaseConfiguration.create();
        Job job = Job.getInstance(conf);
        Scan scan = new Scan();
        scan.setCaching(300);
        scan.setCacheBlocks(false);

        //设置Mapper
        //此时TableMapReduceUtil.initTableMapperJob设置的是Mapper,因为是Mapper对Hbase表操作
        //其中确定了输出的类型
        String tName = "student";
        TableMapReduceUtil.initTableMapperJob(tName, scan, MyMapper.class, Text.class, IntWritable.class, job);

        //设置Reducer
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        String strOutputPath = "hdfs://localhost:9000/output";
        FileOutputFormat.setOutputPath(job,new Path(strOutputPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

3)从Hbase表中读出数据,并输出结果到Hbase表中

import java.io.IOException;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;

public class Task extends Configured implements Tool {
    public static class MyMapper extends TableMapper {        
        
        public static final byte[] family = "bacin_info".getBytes();
        public static final byte[] column = "age".getBytes();
        @Override
        protected void map(ImmutableBytesWritable rowKey, Result result,Context context)
        throws IOException,InterruptedException{
            String major = Bytes.toString(result.getValue("school_info".getBytes(),
            "major".getBytes()));
            byte[] value = result.getValue(family,column);
            String str = Bytes.toString(value);
            int age = Integer.parseInt(Bytes.toString(value));
            context.write(new Text(major),new IntWritable(age));

        }


        
    }
    public static class MyTableReducer extends TableReducer {
        @Override
        
        int sum =0;
        int num = 0 ;
        for(IntWritable value :values){
            sum+=value.get();
            num++;
        }
        double avg =0;
        avg=sum*1.0/num;
        String i,j;
        i=x+"";
        j=avg+"";
        Put put =new Put(i.getBytes());
        x++;
        put.addColumn("average_infos".getBytes(),"age".getBytes(),key.getBytes());
        put.addColumn("average_infos".getBytes(),"major".getBytes(),j.getBytes());
        context.write(null,put);
        


        
    }
    public int run(String[] args) throws Exception {
        //配置Job
        
        Configuration conf = HbaseConfiguration.create();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Task.class);
        Scan scan = new Scan();
        scan.setCaching(300);
        scan.setCacheBlocks(false);
        String strTableName = "student";
        TableMapReduceUtil.initTableMapperJob(strTableName,scan,MyMapper.class,Text.class,IntWritable.class,job);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyTableReducer.class);
        TableMapReduceUtil.initTableReducerJob("t_avg",MyTableReducer.class,job);

        boolean inDone = job.waitForCompletion(true);
        return isDone ? 0 : 1;


        
    }
    public static void main(String[] args) {
        try {
            int r = new Task().run(args);
            if (r != 0) {
                System.exit(r);
            }            
            // 扫描't_avg'表读取map-reduce的结果,以下代码勿动
            Configuration config = new Configuration();
            Connection conn = ConnectionFactory.createConnection(config);
            TableName tableName = TableName.valueOf("t_avg");
            Table table = conn.getTable(tableName);
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan);
            for(Result result : scanner){
                for (Cell kv : result.rawCells()) {
                    String value = Bytes.toString(CellUtil.clonevalue(kv));
                    System.out.println(value);
                }
            }
            scanner.close();
            conn.close();        
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

马上就会整理~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/689998.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号