栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop | MapReduce学习笔记 | JavaAPI更换切片机制 | CombineTextInputFormat 切片 | 词频统计案例

Hadoop | MapReduce学习笔记 | JavaAPI更换切片机制 | CombineTextInputFormat 切片 | 词频统计案例

文章目录

一、参考资料二、运行环境三、CombineTextInputFormat 切片机制四、词频统计

4.1 Mapper4.2 Reducer4.3 Driver 驱动类(关键) 五、总结

一、参考资料

视频链接

二、运行环境

windows 10JDK 8Hadoop 3.1.3 windows版IDEA 三、CombineTextInputFormat 切片机制


Hadoop框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会作为一个单独的切片,都会交给一个MapTask执行,当处理大量小文件时,效率会比较低。

CombineTextInputFormat切片机制作用域小文件过多的场景,可以将多个小文件从逻辑上规划到一个切片中,从而实现多个小文件交给一个MapTask处理。

虚拟存储切片最大值的设置:

CombineTextInputFormat.setMaxInputSplitSize(JOB, 4194304); // 4 MB

注:虚拟存储切片最大值最好根据实际的小文件大小情况来设置具体的值

切片机制:

生成切片过程包括:虚拟存储过程和去切片过程两部分

四、词频统计
4.1 Mapper
package com.uni.combineTextInputFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper {
    // 放在上面声明防止在循环里多次创建对象,浪费空间
    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 获取一行
        String line = value.toString();
        // 2. 切割
        String[] words = line.split(" ");
        // 3. 循环写出
        for (String word : words) {
            // 封装 outKey
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

4.2 Reducer
package com.uni.combineTextInputFormat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class WordCountReducer extends Reducer {
    private  IntWritable outValue = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }
        outValue.set(sum);
        // 写出
        context.write(key, outValue);
    }
}
4.3 Driver 驱动类(关键)
package com.uni.combineTextInputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver{
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 获取 job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2. 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        // 3. 关联 mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4. 设置 map 输出的 k v 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5. 设置最终输出的k v类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6. 修改切片规则, 默认是TextInputFormat.calss
        job.setInputFormatClass(CombineTextInputFormat.class);
        // 7. 虚拟存储切片最大值设置成4MB
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        // 8. 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("input1"));
        FileOutputFormat.setOutputPath(job, new Path("output1"));
        // 9. 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

五、总结

MapReduce 更换切片机制只需在提交Job前,调用 org.apache.hadoop.mapreduce.Job对象API。主要有两个步骤,先设置切片机制为CombineTextInputFormat,然后再设置虚拟存储的最大值,这个会根据小文件(按文件名字典顺序的升序结果文件集合)的大小而决定切片的个数。

// 设置切片机制
job.setInputFormatClass(CombineTextInputFormat.class);
// 设置虚拟存储切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728789.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号