栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

学大数据小胖的第四十九天

学大数据小胖的第四十九天

[root@master ~]# mv score.txt /usr/local/soft/data
[root@master ~]# cd /usr/local/soft/data
[root@master data]# ls
new_db.sql  student.sql         theZen.txt
score.sql   students.txt        wordcount
score.txt   theZenOfPython.txt  words.txt
[root@master data]# hdfs dfs -mkdir -p /data/score/input
[root@master data]# hdfs dfs -put score.txt /data/score/input/
[root@master data]# cd ..
[root@master soft]# cd jars/
[root@master jars]# ls
hadoop-1.0-SNAPSHOT.jar
[root@master jars]# rm hadoop-1.0-SNAPSHOT.jar 
rm:是否删除普通文件 "hadoop-1.0-SNAPSHOT.jar"?y
[root@master jars]# rz -E
rz waiting to receive.
[root@master jars]# hadoop jar hadoop-1.0-SNAPSHOT.jar com.shujia.MapReduce.Demo03Join
package com.shujia.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;

public class Demo03Join {
    //Map端
    public static class MyMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split(",");
            //学生的id作为Map输出的key
            String id = splits[0];
            String outputValue = "";
            //通过splits.length判断当前处理的这条数据到底是属于students还是score的数据
            if(splits.length==5){
                //students学生信息数据
                outputValue="#"+value.toString();
            }else if(splits.length==3){
                //score学生分数信息数据
                outputValue="$"+value.toString();
            }
            context.write(new Text(id),new Text(outputValue));
        }
    }

    //Reduce端
    public static class MyReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
            //从values中区分学生信息数据以及学生分数数据
            
            //因为Iterable values只能被遍历一次,但需要使用了两次
            //已知values的数据量不是很大,所以直接将values的数据放到List
            ArrayList valueList = new ArrayList<>();
            for (Text value : values) {
                valueList.add(value.toString());
            }

            //提取学生信息:姓名,班级
            String name = "";
            String clazz = "";
            for (String valueStr : valueList) {
                if (valueStr.contains("#")) {
                    name = valueStr.split(",")[1];
                    clazz = valueStr.split(",")[4];
                }
            }
            for (String valueStr : valueList) {
                if (valueStr.contains("$")) {
                    System.out.println("获取学生成绩数据");
                    String subjectId = valueStr.split(",")[1];
                    String subjectScore = valueStr.split(",")[2];
                   //Reduce Join
                    context.write(key, new Text(name + "," + clazz + "," + subjectId + "," + subjectScore));
                }
            }
        }
    }

    //Driver端
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://master:9000");
        //指定分隔符为逗号
        conf.set("mapred.textoutputformat.separator",",");
        //创建一个MapReduce的job
        Job job = Job.getInstance(conf);
        //配置任务
        job.setJobName("Demo03Join");
        //设置任务运行哪个类
        job.setJarByClass(Demo03Join.class);

        //配置map端
        //指定map运行时哪一个类
        job.setMapperClass(MyMapper.class);
        //配置Map端输出的key类型
        job.setMapOutputKeyClass(Text.class);
        //配置Map端输出的value类型
        job.setMapOutputValueClass(Text.class);

        //配置Reduce端
        //指定Reduce运行时哪一个类
        job.setReducerClass(Demo03Join.MyReducer.class);
        //配置Reduce端输出的key类型
        job.setOutputKeyClass(Text.class);
        //配置Reduce端输出的value类型
        job.setOutputValueClass(Text.class);

        //配置输入输出路径
        
        FileInputFormat.addInputPath(job,new Path("/data/stu/input"));
        FileInputFormat.addInputPath(job,new Path("/data/score/input"));

        Path path = new Path("/data/join/output");

        FileSystem fs = FileSystem.get(conf);
        //判断输出路径是否存在,存在则删除
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        //输出路径已存在,会报错
        FileOutputFormat.setOutputPath(job,path);

        //等待任务完成
        job.waitForCompletion(true);
    }
    
}
 MapJoin:

大表关联小表

小表默认25M以下

大表无上限

原理:将小表进行广播,广播到每一个处理大表的Map任务当中

package com.shujia.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Hashtable;

public class Demo04MapJoin {
    //Map端
    public static class MyMapper extends Mapper {
        //初始化在使用
        Hashtable stuKV=new Hashtable<>();
        //每个MapTask启动的时候会执行一次
        @Override
        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
         //获取小表的数据 并缓存到MapTask的内存当中
            //通过context可以获取广播的小表的路径
            URI[] cacheFiles = context.getCacheFiles();
            //获取小表路径
            String path = cacheFiles[0].toString();
            //使用原生的HDFS JAVA API 加载小表的数据
            FileSystem fs = FileSystem.get(context.getConfiguration());
            FSDataInputStream fsDataInputStream = fs.open(new Path(path));
            BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));
            String line;
            //为了方便做关联 需要选择合适的数据结构
            //HashTable
            while((line=br.readLine())!=null){
                String id =line.split(",")[0];
                //以id作为key,line作为value 存入HashTable
                stuKV.put(id,line);
            }
        }

        @Override
        //主要处理大表的数据
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split(",");
            String id = splits[0];
            String subjectId=splits[1];
            String subjectScore=splits[2];
            //通过id去HashTable中获取学生信息数据,以此实现关联
            String stuInfo = stuKV.getOrDefault(id,"");
            //避免未关联上导致索引越界
            if(!"".equals(stuInfo)){
                String[] stuSplits=stuInfo.split(",");
                if(stuSplits.length>=5){
                    String name= stuInfo.split(",")[1];
                    String clazz=stuInfo.split(",")[4];
                    context.write(new Text(id),new Text(name+','+clazz+","+subjectId+","+subjectScore));
                }
            }
        }
    }

    //Driver端
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://master:9000");
        //指定分隔符为逗号
        conf.set("mapred.textoutputformat.separator",",");
        //创建一个MapReduce的job
        Job job = Job.getInstance(conf);
        //配置任务
        job.setJobName("Demo04MapJoin");
        //设置任务运行哪个类
        job.setJarByClass(Demo03Join.class);

        //配置map端
        //指定map运行时哪一个类
        job.setMapperClass(Demo04MapJoin.MyMapper.class);
        //配置Map端输出的key类型
        job.setMapOutputKeyClass(Text.class);
        //配置Map端输出的value类型
        job.setMapOutputValueClass(Text.class);

        //配置输入输出路径
        FileInputFormat.addInputPath(job,new Path("/data/score/input"));

        //把文件看成一张表,广播小表
        job.addCacheFile(new URI("hdfs://master:9000/data/stu/input/students.txt"));

        Path path = new Path("/data/mapJoin/output");
        FileSystem fs = FileSystem.get(conf);
        //判断输出路径是否存在,存在则删除
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        //输出路径已存在,会报错
        FileOutputFormat.setOutputPath(job,path);

        //等待任务完成
        job.waitForCompletion(true);
    }
    
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/779512.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号