栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MyReduce经典习题练习

MyReduce经典习题练习

实现过程所需的java代码 1,在hdfs目录/tmp/input/wordcount中有一系列文件,内容均为","号分隔, 求按","号分隔的各个元素的出现频率,输出到目录/tmp/个人用户名的hdfs目录中。

实现代码:

package com.tledu.hadoop.mr.homework;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

public class HomeWork1 {
    public static class HomeWork1Mapper extends Mapper {
        Text word = new Text();
        IntWritable one = new IntWritable(1);

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer str = new StringTokenizer(value.toString(), ",");
            while (str.hasMoreTokens()) {
                word.set(str.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class HomeWork1Reducer extends Reducer {
        IntWritable sumRes = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            sumRes.set(sum);
            context.write(key, sumRes);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 创建配置
        Configuration conf = new Configuration();
        // 2. 创建任务
        Job job = Job.getInstance(conf, "homework1");
        // 3. 设置对应类
        job.setJarByClass(HomeWork1.class);
        job.setMapperClass(HomeWork1Mapper.class);
        job.setCombinerClass(HomeWork1Reducer.class);
        job.setReducerClass(HomeWork1Reducer.class);
        // 4. 输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 5. 输入和输出的路径
        // 获取所有输入的txt文件
        List fileList = getTxtFileListFromPath(args[0]);
        for (String filePath : fileList) {
            FileInputFormat.addInputPath(job, new Path(filePath));
        }

        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 6. 启动任务
        System.exit(job.waitForCompletion(true)?0:1);
    }

    
    public static List getTxtFileListFromPath(String folderPath) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(folderPath);
        // 1. 获取文件夹下所有的文件
        FileStatus[] statuses = fs.listStatus(path);

        // 对应的txt文件的地址列表
        List list = new ArrayList<>();

        for (FileStatus fileStatus : statuses) {
            // 获取到对应的文件
            Path file = fileStatus.getPath();

            // 读取的是txt文件
            if (fileStatus.isFile() && file.getName().endsWith(".txt")) {
                list.add(file.toString());
            }
            // 如果是文件夹
            else if (fileStatus.isDirectory()) {
                list.addAll(getTxtFileListFromPath(file.toString()));
            }
        }
        return list;
    }
}
2、在hdfs目录/tmp/input/wordcount目录中有一系列文件,内容为","号分隔,分隔后的元素均为数值类型、字母、中文,求所有出现的数值的平均值

实现代码:

package com.tledu.hadoop.mr.homework;

import com.tledu.hadoop.utils.RegUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

public class HomeWork3 {
    public static class HomeWork3Mapper extends Mapper {
        Text word = new Text("sum=");
        DoubleWritable number = new DoubleWritable();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String path = ((FileSplit)context.getInputSplit()).getPath().toString();

//最后一个斜线的位置

            int index = path.lastIndexOf("/");

//得到输入文件的文件名

            String fileName = path.substring(index+1);

            StringTokenizer str = new StringTokenizer(value.toString(), ",");
            while (str.hasMoreTokens()) {
                String text = str.nextToken();
                if (RegUtils.isNumber(text)) {
                    number.set(Double.parseDouble(text));
                    context.write(word, number);
                }
            }
        }
    }

    public static class HomeWork3Reducer extends Reducer {
        DoubleWritable sumRes = new DoubleWritable();
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            double sum = 0;
            int count = 0;
            for (DoubleWritable val : values) {
                sum += val.get();
                count++;
            }
            System.out.println(sum / count);
            sumRes.set(sum / count);
            context.write(key, sumRes);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 创建配置
        Configuration conf = new Configuration();
        // 2. 创建任务
        Job job = Job.getInstance(conf, "homework2");
        // 3. 设置对应类
        job.setJarByClass(HomeWork3.class);
        job.setMapperClass(HomeWork3Mapper.class);
//        job.setCombinerClass(HomeWork3Reducer.class);
        job.setReducerClass(HomeWork3Reducer.class);
        // 4. 输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        // 5. 输入和输出的路径
        // 获取所有输入的txt文件
        List fileList = getTxtFileListFromPath(args[0]);
        for (String filePath : fileList) {
            FileInputFormat.addInputPath(job, new Path(filePath));
        }

        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 6. 启动任务
        System.exit(job.waitForCompletion(true)?0:1);
    }

    
    public static List getTxtFileListFromPath(String folderPath) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(folderPath);
        // 1. 获取文件夹下所有的文件
        FileStatus[] statuses = fs.listStatus(path);

        // 对应的txt文件的地址列表
        List list = new ArrayList<>();

        for (FileStatus fileStatus : statuses) {
            // 获取到对应的文件
            Path file = fileStatus.getPath();

            // 读取的是txt文件
            if (fileStatus.isFile() && file.getName().endsWith(".txt")) {
                list.add(file.toString());
            }
            // 如果是文件夹
            else if (fileStatus.isDirectory()) {
                list.addAll(getTxtFileListFromPath(file.toString()));
            }
        }
        return list;
    }
}
3,在hdfs目录/tmp/input/wordcount目录中有一系列文件,内容为","号分隔,同时在hdfs路径/tmp/black.txt黑名单文件,一行一个单词用于存放不记入统计的单词列表。求按","号分隔的各个元素去除掉黑名单后的出现频率,输出到目录/tmp/output/个人用户名的hdfs目录中。

实现代码:

package com.tledu.hadoop.mr.homework;

import com.tledu.hadoop.utils.RegUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

public class HomeWork5 {
    public static class HomeWork5Mapper extends Mapper {
        Text word = new Text();
        IntWritable one = new IntWritable(1);
        IntWritable blackVal = new IntWritable(0);

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String path = ((FileSplit) context.getInputSplit()).getPath().toString();
            int index = path.lastIndexOf("/");
            //得到输入文件的文件名
            String fileName = path.substring(index+1);
            if (fileName.contains("blacklist")) {
                // 是黑名单的内容
                word.set(value.toString());
                context.write(word,blackVal);
            }else {
                StringTokenizer str = new StringTokenizer(value.toString(), ",");
                while (str.hasMoreTokens()) {
                    word.set(str.nextToken());
                    context.write(word, one);
                }
            }
        }
    }

    public static class HomeWork5Reducer extends Reducer {
        IntWritable sumRes = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            boolean isBlack = false;
            for (IntWritable val : values) {
                // 有一项等于0了,说明它属于黑名单
                if (val.get() == 0) {
                    // 这是黑名单
                    isBlack = true;
                    break;
                }
                sum += val.get();
            }
            sumRes.set(sum);
            if (!isBlack) {
                context.write(key, sumRes);
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 创建配置
        Configuration conf = new Configuration();
        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        // 2. 创建任务
        Job job = Job.getInstance(conf, "homework3");
        // 3. 设置对应类
        job.setJarByClass(HomeWork5.class);
        job.setMapperClass(HomeWork5Mapper.class);
//        job.setCombinerClass(HomeWork5Reducer.class);
        job.setReducerClass(HomeWork5Reducer.class);
        // 4. 输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 5. 输入和输出的路径
        // 获取所有输入的txt文件
        List fileList = getTxtFileListFromPath(remainingArgs[0]);
        for (String filePath : fileList) {
            FileInputFormat.addInputPath(job, new Path(filePath));
        }

        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        // 6. 启动任务
        System.exit(job.waitForCompletion(true)?0:1);
    }

    
    public static List getTxtFileListFromPath(String folderPath) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(folderPath);
        // 1. 获取文件夹下所有的文件
        FileStatus[] statuses = fs.listStatus(path);

        // 对应的txt文件的地址列表
        List list = new ArrayList<>();

        for (FileStatus fileStatus : statuses) {
            // 获取到对应的文件
            Path file = fileStatus.getPath();

            // 读取的是txt文件
            if (fileStatus.isFile() && file.getName().endsWith(".txt")) {
                list.add(file.toString());
            }
            // 如果是文件夹
            else if (fileStatus.isDirectory()) {
                list.addAll(getTxtFileListFromPath(file.toString()));
            }
        }
        return list;
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735647.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号