栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop期末复习贴

Hadoop期末复习贴

从头开始看hadoop程序hhhh

1)WordCount

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MyMapReducer {
    
    public static class mapper extends Mapper {
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split(" ");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1));
            }

        }

    }

    
    public static class reducer extends Reducer {
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int count = 0;
            for (IntWritable value : values) {
                count += value.get();//转化为java类型int
            }
            context.write(key, new Text(count + ""));//加一个""转化为string类型

        }

    }

    

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        
        job.setMapperClass(mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        
        job.setReducerClass(reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //指定原始数据位置
        FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/test"));

        //指定处理的结果数据位置
        FileOutputFormat.setOutputPath(job,new Path("hdfs://localhost:9000/result"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

2)hbase结合MapReduce

  • hdfs传输文件到hbase
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class FileToHbase extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new FileToHbase(), args);
        System.exit(run);
    }

    
    public static class HDFSToHbaseMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    
    public static class HDFSToHbaseReducer extends TableReducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            String[] split = key.toString().split(",");
            Put put = new Put(split[0].getBytes()); //确定行号
            put.addColumn("basic_info".getBytes(), "name".getBytes(), split[1].getBytes());
            put.addColumn("basic_info".getBytes(), "gender".getBytes(), split[2].getBytes());
            put.addColumn("basic_info".getBytes(), "age".getBytes(), split[3].getBytes());
            put.addColumn("school_info".getBytes(), "major".getBytes(), split[4].getBytes());
            
            context.write(null, put);

        }
    }


    @Override
    public int run(String[] strings) throws Exception {
        
        Configuration conf = HbaseConfiguration.create();
        FileSystem fs = FileSystem.get(conf);
        Job job = Job.getInstance(conf);
        job.setJarByClass(FileToHbase.class);

        job.setMapperClass(HDFSToHbaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        Path inputPath = new Path("hdfs://localhost:9000/student");
        FileInputFormat.addInputPath(job, inputPath);//设置源文件的地址

        
        TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        boolean isDone = job.waitForCompletion(true);
        return isDone ? 0 : 1;
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671899.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号