栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop进阶经典案例总结

Hadoop进阶经典案例总结

一,以hdfs路径/tmp/table/student_score.txt为输入,表结构为(学号,姓名,课程名称,成绩),字段间分隔符为tab,如下图所示。通过设置reduce个数为2,自定义hash partition实现将其中姓名为"张一"的放到同一个reduce中,非张一的放到其它的reduce中,输出结果字段为(学号,姓名,课程名称,成绩),按tab分隔即可。

 具体实现代码(java部分):

package com.hadoop.mr.homework2;

import com.hadoop.ReadFromHdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class HomeWork2 {
    // 设计kv
    // key 就是学号
    // value 除了学号之外的其它内容
    public static class HomeWork2Mapper extends Mapper {
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] valueArr = value.toString().split("t");
            // 第一项就是学号   valArr[0]是key ,剩下的是value
            // 剩下的就是其它的内容      通过for循环依次赋给自定义的String类型的val
            String val = "";
            for (int i = 1; i < valueArr.length; i++) {
                val += valueArr[i];
                if (i != valueArr.length - 1) {
                    val += "t";
                }
            }
            context.write(new Text(valueArr[0]), new Text(val));
        }
    }

    public static class HomeWork2Reduce extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            //reduce直接输出
            for (Text val : values) {
                context.write(key, val);
            }
        }
    }

    public static class CustomPartitions extends Partitioner {
        @Override
        public int getPartition(Text text, Text text2, int i) {
            if ("001".equals(text.toString())) {
                return 0;
            }else {
                return 1;
            }
        }
    }


    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 创建配置
        Configuration conf = new Configuration();

        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        // 2. 创建任务
        Job job = Job.getInstance(conf, "homework2");
        // 3. 设置对应类
        job.setJarByClass(HomeWork2.class);
        job.setMapperClass(HomeWork2Mapper.class);
        job.setCombinerClass(HomeWork2Reduce.class);
        job.setReducerClass(HomeWork2Reduce.class);
        job.setPartitionerClass(CustomPartitions.class);
        // 4. 输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 5. 输入和输出的路径
        // 获取所有输入的txt文件
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        // 6. 启动任务
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

二,白名单问题

这里有个注意点:

//特殊的正则表达式       \r?\n    为的是能让不同的系统都能识别分隔
package com.hadoop.mr.homework2;

import com.hadoop.ReadFromHdfs;
import com.hadoop.mr.homework.HomeWork1;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class  WhiteList {
    public static class WhiteListMapper extends Mapper {
        //建立一个Set类型的白名单
        Set whiteList = new HashSet<>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // 初始化参数配置
            Configuration conf = context.getConfiguration();
            // 获取到在配置中的白名单字符串
            String whiteListStr = conf.get("whiteList");
            // 字符串切割,成白名单的数组
            //特殊的正则表达式   \r?\n   为的是能让不同的系统都能识别分隔
            List whiteListOriginal = Arrays.asList(whiteListStr.split("\r?\n"));
            // 添加到成员变量中,方便在map中进行使用
            whiteList.addAll(whiteListOriginal);
        }

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // 根据t切割数据,第一项就是key 也就是s1、s2 ... 第二项就是值
            String[] valueArr = value.toString().split("t");
            // 在白名单中进行输出
            if (whiteList.contains(valueArr[0])) {
                // 才进行输出
                context.write(new Text(valueArr[0]), new IntWritable(Integer.parseInt(valueArr[1])));
            }
        }
    }

    public static class WhiteListReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            // 拿到所有需要输出的数据
            for (IntWritable val : values) {
                // 逐条输出
                context.write(key, val);
            }
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 创建配置
        Configuration conf = new Configuration();

        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        // 把白名单的内容作为参数,传递给map或者reduce
        // 通过configuration进行传递
        // 先获取白名单文件
        String whiteList= ReadFromHdfs.getStringByByte(ReadFromHdfs.readFileFromHdfs(remainingArgs[2]));
        // 设置到配置文件中
        conf.set("whiteList",whiteList);

        // 2. 创建任务
        Job job = Job.getInstance(conf, "homework2");
        // 3. 设置对应类
        job.setJarByClass(WhiteList.class);
        job.setMapperClass(WhiteListMapper.class);
        job.setCombinerClass(WhiteListReducer.class);
        job.setReducerClass(WhiteListReducer.class);
        // 4. 输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 5. 输入和输出的路径
        // 获取所有输入的txt文件
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        // 6. 启动任务
        System.exit(job.waitForCompletion(true)?0:1);
    }

    // 1. 获取到白名单内容
    // 2. 想办法白名单传递给map或者reduce
    // 3. 通过conf进行传递
    // 4. 可以在map阶段获取到白名单的内容
    // 5. 在map输出的时候进行判断,如果是白名单中的内容才进行输出
}
三,流式分组问题

已排序好文本文件的分组-流式分组)给定一个本地文本文件finance_record_sorted.txt,共2个字段(工号,报销费用),其中按工号升序排列,并用tab分隔。求对该数据进行按工号字段的分组,并在控制台输出如下图所示的结果。

package com.hadoop.mr.homework2;

import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

public class CurrentGroup {
    public static void main(String[] args) throws IOException {
        //1,逐行读取数据
        String path= "E:\daily_expense.txt";
        FileInputStream fis =new FileInputStream(path);
        BufferedReader br =new BufferedReader(new InputStreamReader(fis));
        //2,将S作为key,后面的作为集合存储
        String line=null;
        String first=null;
        List list =new ArrayList<>();
        while((line = br.readLine()) != null){
            String[] values =line.split("t");
            //3,如果key一样,则vlaue存入集合组中
            if(first==null){
                first = values[0];
                list.add(values[1]);
            }else if(first.equals(values[0])){
                list.add(values[1]);
            }else{
                //4,如果key不一样,则输出上一个key,并将上一个key替换为新的key
                System.out.println(first+ " "+list);
                list.clear();
                first=values[0];
                list.add(values[1]);
            }
        }
        //5,输出最后一组key,val
        System.out.println(first+ " "+list);

        br.close();
        fis.close();


    }


}
结果展示: 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735110.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号