1
、文件格式
文件格式按面向的存储形式不同,分为面向行和面向列的两大类文件格式。
2、
压缩格式
压缩格式按其可切分计算性,分为可切分计算和不可切分计算两种。
3、配置文件
//确认压缩
-Dmapred.output.compress=true
//指定压缩方式
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
4、
Partition
partition是在map阶段完成后执行的。然后将分好区的数据传输到reduce端,也就是由Partitioner来决定每条记录应该送往哪个reducer节点。mapreduce中默认的分区是HashPartition类;
核心代码:
public static class MyPartition extends Partitioner {
@Override
public int getPartition(Text text, Text text2, int i) {
if("001".equals(text.toString())||"002".equals(text.toString())){
return 0;
}
else {
return 1;
}
}
}
5、MapReduce 个数的确定
在 Job 提交后,任务正式开始计算之前即已经确定
Map 数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定,待冲刺环节详解。
Reduce 数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,待冲刺环节详
解。另外,用户可以自定义设置,通过参数配置,由用户决定。
6
、自定义
reduce
数量
-Dmapred.reduce.tasks=2
最终效果:
[yanyufei@cluster2 ~]$ hdfs dfs -text /user/yanyufei/output17/part-r-00000.gz 001 张一 政治 85 [yanyufei@cluster2 ~]$ hdfs dfs -text /user/yanyufei/output17/part-r-00001.gz 002 张二 Java程序设计 95 003 张三 政治 80 004 张四 政治 80 [yanyufei@cluster2 ~]$ [yanyufei@cluster2 ~]$ hdfs dfs -ls /user/yanyufei/output17/ Found 3 items -rw-r--r-- 3 yanyufei job018 0 2021-10-12 10:45 /user/yanyufei/output17/_SUCCESS -rw-r--r-- 3 yanyufei job018 43 2021-10-12 10:45 /user/yanyufei/output17/part-r-00000.gz -rw-r--r-- 3 yanyufei job018 81 2021-10-12 10:44 /user/yanyufei/output17/part-r-00001.gz
7、读取外部配置文件-Configuration 传递
7.1、步骤分解
实现基于 input_secondsort 文件的一次排序
将本地文件 whitelist.txt 传给 Driver 类,读取到该文件内容 txtContent
将 txtContent 通过 Configuration 的 set 方法传递给 map 和 reduce 任务
在 map 任务中通过 Configuration 对象的 get 方法获取传递过来的值 txtContent
将 txtContent 解析成 Set 对象,对 map 任务重的 map 方法进行过滤输出
由于 map 端已经做了过滤,reduce 端将不需任何改变
实现完整代码:
public class WhiteList {
//map阶段负责对输入文件进行切分处理,然后汇总再分组给reduce进行处理,以达到高效的分布式计算效率
public static class WhiteListMap extends Mapper{
public Text reskey=new Text();
public Text resValue=new Text();
public List whiteList;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//map执行之前先调用这个方法,全局只会调用一次,在这可以进行初始化
//获取配置文件
Configuration conf=context.getConfiguration();
String whilteStr=conf.get("whiteList");
//初始化whiteList变量
whiteList= Arrays.asList(whilteStr.split("t"));
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[]values=value.toString().split("t");
if(whiteList.contains(values[0])){
//在名单里应该展示
reskey.set(values[0]);
resValue.set(values[1]);
context.write(reskey,resValue);
}
}
}
public static class WhilteListReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
for(Text val:values){
context.write(key,val);
}
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//创建Configuration
Configuration conf=new Configuration();
//参数解析器,将mapreduce需要的参数传给配置文件,其他参数传给remainingArgs
//处理参数,把mapred参数放到conf里,会把其他参数提取出来
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
//获取输入的参数,设计输入参数
//第一个参数 输入文件位置,用,隔开
//第二个参数 输出文件位置
//第三个参数 白名单地址
String[] remainingArgs = optionParser.getRemainingArgs();
//调用;把白名单文件内容获取到配置文件里
fileread(conf,remainingArgs[2]);
//创建任务Job
Job job=Job.getInstance(conf,"白名单2");
//指定Driver
job.setJarByClass(WhiteList.class);
//指定mapper
job.setMapperClass(WhiteListMap.class);
//设置partition
// job.setPartitionerClass(Partition.MyPartition.class);
//指定combine
// job.setCombinerClass(CountAversge.CountAverageReduce.class);
//指定reducer
job.setReducerClass(WhilteListReducer.class);
//输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//输入输出的文件
for(String inputPath:remainingArgs[0].split(",")){
FileInputFormat.addInputPath(job,new Path(inputPath));
}
FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
//执行
System.exit(job.waitForCompletion(true)?0:1);
}
public static void fileread(Configuration conf,String filepath) throws IOException {
//把白名单文件内容获取到配置文件里
//IO流
FileInputStream fis=new FileInputStream(filepath);
//获取缓冲流
BufferedReader br=new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
String line="";
StringBuilder result=new StringBuilder();
boolean isFirst=true;
while ((line=br.readLine())!=null){
if(!isFirst){
result.append("t");
}else {
isFirst=false;
}
result.append(line);
}
//在配置文件里定义变量,并且这个变量可以传递给么一个map
conf.set("whiteList",result.toString());
}
}