栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop

Hadoop

目录

1.Reduce Join

2.Reduce Join案例实操

1)需求

2)需求分析

3)代码实现 

4)测试

5)总结

3.Map Join

4.Map Join案例实操

1)需求

2)需求分析

3)实现代码


1.Reduce Join

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

2.Reduce Join案例实操

1)需求

表1 订单数据表t_order

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

1004

01

4

1005

02

5

1006

03

6

表2 商品信息表t_product

pid

pname

01

小米

02

华为

03

格力

将商品信息表中数据根据商品pid合并到订单数据表中。

表3 最终数据形式

id

pname

amount

1001

小米

1

1004

小米

4

1002

华为

2

1005

华为

5

1003

格力

3

1006

格力

6

2)需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

3)代码实现 

(1)创建商品和订单合并后的TableBean类

public class TableBean implements Writable {
    //id pid amount
    //pid pname
    private String id;//订单id
    private String pid;//商品id
    private int amount;//商品数量
    private String pname;//商品名称
    private String flag;//标记是什么表 order还是pd

    //空参构造 序列化必须要的
    public TableBean() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id = in.readUTF();
        this.pid = in.readUTF();
        this.amount = in.readInt();
        this.pname = in.readUTF();
        this.flag = in.readUTF();
    }

    @Override
    public String toString() {
        return id + "t" + pname + "t" + "t" + amount;
    }
}

(2)编写TableMapper类

public class TableMapper extends Mapper {

    private String fileName;
    private Text outK = new Text();
    private TableBean outV = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取各文件切片
        FileSplit split = (FileSplit)context.getInputSplit();
        //获取文件名称
        //一种优化手段,只获取一次名称就够了
        fileName = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1 获取一行
        String line = value.toString();

        //2 判断是哪个文件的
        if (fileName.contains("order")) {//处理的是订单表
            String[] split = line.split("t");

            //封装k v
            outK.set(split[1]);
            outV.setId(split[0]);
            outV.setPid(split[1]);
            outV.setAmount(Integer.parseInt(split[2]));
            outV.setPname("");//设默认值
            outV.setFlag("order");

        } else {//处理的是商品表
            String[] split = line.split("t");

            outK.set(split[0]);
            outV.setId("");
            outV.setPid(split[0]);
            outV.setAmount(0);//设默认值
            outV.setPname(split[1]);
            outV.setFlag("pd");
        }

        context.write(outK, outV);
    }
}

(3)编写TableReducer类

public class TableReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //这组数据进入同一个reduce方法
        //01 1001 1 order
        //01 1004 4 order
        //01 小米     pd

        ArrayList orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        //循环遍历
        for (TableBean value : values) {
            if("order".equals(value.getFlag())){//订单表
                
                //使用orderBeans.add(value),orderBeans加入的值是地址,实际上加入的是同一个地址,但是地址内的值一直在变,
                //最后加入orderBeans的都是添加的最后一个
                //因此需要一个中间量,new出来的中间量地址都不一样,最后加入orderBeans的是不同对象
                TableBean tmptableBean = new TableBean();
                try {
                    BeanUtils.copyProperties(tmptableBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                orderBeans.add(tmptableBean);
            }else {//商品表
                //因为pd.txt的一个pid只有一行,即reduce方法执行一次只处理一行的商品表内容,不需要像订单表一样那么麻烦
                try {
                    BeanUtils.copyProperties(pdBean,value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        //循环遍历orderBeans,赋值pdName
        //因为以pid为key,reduce方法的一次执行中pid都是相同的,订单表就和商品表关联起来
        for (TableBean orderBean : orderBeans) {
            orderBean.setPname(pdBean.getPname());

            context.write(orderBean,NullWritable.get());
        }
    }
}

(4)编写TableDriver类

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TableDriver.class);
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);

        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("D:\code\Hadoop\input\inputtable"));
        FileOutputFormat.setOutputPath(job, new Path("D:\code\Hadoop\output1"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

4)测试

5)总结

缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

解决方案:Map端实现数据合并。

3.Map Join

1)使用场景

Map Join适用于一张表十分小、一张表很大的场景。

2)优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3)具体办法:采用DistributedCache

(1)在Mapper的setup阶段,将文件读取到缓存集合中。

(2)在Driver驱动类中加载缓存。

//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

4.Map Join案例实操

1)需求

表1 订单数据表t_order

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

1004

01

4

1005

02

5

1006

03

6

表2 商品信息表t_product

pid

pname

01

小米

02

华为

03

格力

将商品信息表中数据根据商品pid合并到订单数据表中。

表3 最终数据形式

id

pname

amount

1001

小米

1

1004

小米

4

1002

华为

2

1005

华为

5

1003

格力

3

1006

格力

6

2)需求分析

MapJoin适用于关联表中有小表的情形。

3)实现代码

 (1)先在MapJoinDriver驱动类中添加缓存文件

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        // 1 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置加载jar包路径
        job.setJarByClass(MapJoinDriver.class);
        // 3 关联mapper
        job.setMapperClass(MapJoinMapper.class);
        // 4 设置Map输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 5 设置最终输出KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 加载缓存数据
        job.addCacheFile(new URI("file:///D:/code/Hadoop/input/tablecache/pd.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
        job.setNumReduceTasks(0);

        // 6 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\code\Hadoop\input\inputtable2"));
        FileOutputFormat.setOutputPath(job, new Path("D:\code\Hadoop\output2"));
        // 7 提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

(2)在MapJoinMapper类中的setup方法中读取缓存文件

public class MapJoinMapper extends Mapper {
    private HashMap pdMap = new HashMap<>();
    private Text outK = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取缓存的文件,并把文件内容封装到集合 pd.txt
        URI[] cacheFiles = context.getCacheFiles();

        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));

        //从流中读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        String line;
        //StringUtils导的包是org.apache.commons.lang
        while (StringUtils.isNotEmpty(line = reader.readLine())){
            String[] fields = line.split("t");

            //赋值
            pdMap.put(fields[0], fields[1]);
        }

        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //处理order.txt
        String line = value.toString();
        String[] fields = line.split("t");

        //获取pid
        String pname = pdMap.get(fields[1]);

        //获取订单id和订单数量
        //封装
        outK.set(fields[0] + "t" + pname + "t" + fields[2]);

        context.write(outK, NullWritable.get());
    }
}

不需要reduce类,全部在map端处理,map后的结果为最终结果。 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710753.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号