栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

java操作MapReduce代码

java操作MapReduce代码

第一个mr程序 单词统计
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

// 单词统计
public class MR01 {
    
    public static class WordMapper extends Mapper{
        @Override //快捷方式,输入map自动填写
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line=value.toString();//想要对每行数据处理 需要转化为String
            int v=1;
            context.write(new Text(line),new LongWritable(v));
        }
    }
    
    public static class WordReducer extends Reducer{
        @Override  //快捷方式,输入reduce自动填写
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int count=0;
            for (LongWritable value : values) {
                count+=value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }
    public static void main(String[] args) throws Exception{
        // 配置mapreduce
        Job job = Job.getInstance();
        job.setJobName("第一个mr程序 单词统计");
        job.setJarByClass(MR01.class);
        //Map端所在类的位置
        job.setMapperClass(WordMapper.class);
        //reduce端所在类的位置
        job.setReducerClass(WordReducer.class);
        //指定map端kv的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //指定reduce端kv的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //指定路径
        Path input = new Path("/words.txt");
        Path out = new Path("/output");//输出路径不能已存在
        FileInputFormat.addInputPath(job,input);
        FileOutputFormat.setOutputPath(job,out);
        //启动
        job.waitForCompletion(true);
        System.out.println("正在运行mr");
    }
}

统计一行多个单词
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class mrtest02 {
    public static class WordMapper extends Mapper {
        //map端
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split(",");
            for (String word :words){
                context.write(new Text(word),new LongWritable(1));
            }
        }
    }

    public static class WordReduce extends Reducer{
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            long count=0 ;//接收每个单词的累加结果 ,int接收类型太小,选择long
            for (LongWritable value : values) {
                count+=value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }
    // maia方法中构建mapreduce任务 通过Job类构建
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();
        job.setJobName("统计每行多个单词");
        job.setJarByClass(MR02.class);


        //map reduce 的输出格式
        job.setMapperClass(WordMapper.class);//指定map端类
        job.setMapOutputKeyClass(Text.class);//map端输出格式
        job.setMapOutputValueClass(LongWritable.class);


        job.setOutputKeyClass(WordReduce.class);//指定reduce端类
        job.setOutputKeyClass(Text.class);//map端输出格式
        job.setOutputValueClass(LongWritable.class);


        //指定路径 注意:输出路径不能已存在
        Path input = new Path("/words.txt");
        FileInputFormat.addInputPath(job,input);
        Path out = new Path("/output");


        //路径不能已存在
        // 手动加上 如果存在就删除 FileSystem
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if (fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);


        //启动job
        job.waitForCompletion(true);
        System.out.println("统计一行多个单词");
        //运行过程中手动指定具体的类
        // hadoop jar xxxxx.jar com.shujia.mr.具体的类名
        


    }
}

按照班级求age

//1500100981,经鹏涛,23,男,文科六班
求同学年龄和班级
分组,聚合


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class mrtest03 {
    public static class ClazzMapper extends Mapper{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line =value.toString();
            String[] split = line.split(",");
            //1500100981,经鹏涛,23,男,文科六班
            int age =Integer.parseInt(split[2]);
            String clazz =split[4];
            context.write(new Text(clazz),new LongWritable(age));
        }
    }
    public static class ClazzReducer extends Reducer{
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
               long agesum =0;
               for (LongWritable value :values){
                   agesum+=value.get();
               }
               context.write(key,new LongWritable(agesum));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();
        job.setNumReduceTasks(2);//reduce数量可以更改,一个reduce产生一个处理文件
        job.setJobName("按照班级求age");
        job.setJarByClass(MR03CLazzAgeSum.class);

        //map reduce 的输出格式
        job.setMapperClass(MR03CLazzAgeSum.ClazzMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(MR03CLazzAgeSum.ClazzReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //指定路径 注意:输出路径不能已存在
        Path input = new Path("/students.txt");
        FileInputFormat.addInputPath(job,input);
        //路径不能已存在
        // 手动加上 如果存在就删除 FileSystem
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        //启动job
        job.waitForCompletion(true);
        System.out.println("按照班级求age");
        //运行过程中手动指定具体的类
        // hadoop jar xxxxx.jar com.shujia.mr.具体的类名
    }
}

筛选性别

不需要reduce端 ,job.setNumReduceTasks(0);设置为0

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MR04Sex {
    public static class SexMapper extends Mapper{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            String sex = split[3];
            if("男".equals(sex)){
                context.write(new Text(line),NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        
        job.setNumReduceTasks(0);
        job.setJarByClass(MR04Sex.class);
        job.setJobName("sex");
        job.setMapperClass(SexMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        Path input = new Path("/students.txt");
        FileInputFormat.addInputPath(job,input);
        //路径不能已存在
        // 手动加上 如果存在就删除 FileSystem
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        //启动job
        job.waitForCompletion(true);
        System.out.println("sex");
        //运行过程中手动指定具体的类
        // hadoop jar xxxxx.jar com.shujia.mr.具体的类名
    }
}

combine 预聚合
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import java.io.IOException;


public class mrtest05 {
    public static class GenderMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            if("男".equals(split[3])){
                context.write(new Text("男"),new LongWritable(1));
            }
        }
    }
    //combine 预聚合 一个发生在reduce之前reduce端
public static class CombineReducer extends Reducer{
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            long count =0;
            for (LongWritable value :values){
                count+=value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }
    public static class GenderReducer extends Reducer{
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            long count =0;
            for (LongWritable value :values){
                count+=value.get();
            }
            context.write(key,new LongWritable(count));
        }
    }

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJobName("combine 预聚合");
        //map端
        job.setJarByClass(mrtest05.class);
        job.setMapperClass(GenderMapper.class);
        job.setMapOutputKeyClass(Text.class);
        //combine 预聚合
        job.setCombinerClass(CombineReducer.class);
        //reduce端
        job.setMapOutputValueClass(LongWritable.class);
        job.setReducerClass(GenderReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        Path input = new Path("/students.txt");
        FileInputFormat.addInputPath(job,input);
        //路径不能已存在
        // 手动加上 如果存在就删除 FileSystem
        Path output = new Path("/output");
        FileSystem fs =FileSystem.get(new Configuration());
        if (fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);
        //启动job
        job.waitForCompletion(true);
        System.out.println("combine 预聚合");
    }
}

两表联查

有2个task任务,产生2个结果

package com.shujia.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class mrtest06 {
    public static class FilesMaper extends Mapper{
        @Override
        protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value,NullWritable.get());
        }
    }




    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setNumReduceTasks(0);
        job.setMapOutputValueClass(FilesMaper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        Path input1 = new Path("/students.txt");
        FileInputFormat.addInputPath(job,input1);
        Path input2 = new Path("/score.txt");
        FileInputFormat.addInputPath(job,input2);
        //路径不能已存在
        // 手动加上 如果存在就删除 FileSystem
        Path output = new Path("/output");
        FileSystem fs = FileSystem.get(new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job,output);

        //启动job
        job.waitForCompletion(true);




    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581981.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号