什么是数据倾斜?以及现象比如一个文件,abc a 1亿个 b 1个 c 1个
做wordcount
map1 99% map2 100% map3 100%
1.1combiner
a 1亿个 (a,1亿)
b 1个 (b,1)
c 1个 (c,1)
减少数据的网络传输
但是ava不适合,如果导致数据倾斜的kev分布在很多不同的文件,不同mapper,这种方法就不适合了。100mapper每个mapper里1万个a
1.2 导致数据倾斜的kev分布在很多不同的文件的时候
1.2.1局部聚合+全局聚合
第一次map:对于导致数据倾斜的key,加上一个随机数前缀
1_a 2_a 3_a…10_a
这样的话,本来相同的kev是不是也会被分到多个reducer中进行局部聚合
第二次map:去掉kev的随机前缀,进行全局聚合
思想:两次MR,第一次将kev随机散列到不同的reducer进行处理,达到负载均衡的目的第二次再根据去掉kev的随机前缀,按照原本的kev进行reduce处理
1.2.2增加reducer数,提高并行度
job.setNumReduceTasks(int);
比如本来abc只有三个reduce,现在我将reduce数改为30个,是不是也相当于处理a的reducer就多了 job.setNumReduceTasks(0);
也可以设置成0,reduce就不输出了,直接将map的结果写在输出文件里
1.2.3 实现自定义分区
partitioner:按照某种规则(可以自定义)对map输出的数据进行分区操作
默认的是HashPartitioner job.setPartitionerClass(HashPartitioner.class);顺序:map=>partitioner=> reduce
根据数据分布情况,自定义散列函数,将key均匀分配到不同的reducer
13…=>reducer
18…=>reducer
2.1map端
2.1.1 减少输入的文件个数,对小文件进行合并
2.1.2 combiner
2.21/0
数据传输时进行压缩
2.3 reduce端
2.3.1设置mapreduce共存,mapred-site.xml
mapreduce.job.reduce.slowstart.completedmaps默认0.05
也就是说map在执行到5号的时候开始为reduce进行申请资源,开始执行reduce操作
2.3.2 尽量少用reduce
reduce会产生大量的网络消耗,但是该用还是要用
2.3.3 增加reducer,提高并行度
2.4整体
2.4.1 合理的设置map数和reduce数
2.4.2 mr on yarn yarn进行参数调优container
2.4.3 加配置(机器,内存等)
可以
联系到我们SQL中,where的实现逻辑就是只有map没有reduce
第一印象是不可以
但是https://mp.weixin.aa.com/s/SkiHeuosdx-SZEC6AxGaGA
MRMapper端编写
package MapReduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MRMapper extends Mapper{ IntWritable ONE=new IntWritable(1); @Override protected void setup(Context context) throws IOException, InterruptedException { System.out.println("--------------------------Maper.setup------------------------"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("--------------------------Maper.map------------------------"); final String line=value.toString().toLowerCase(); final String[] splits=line.split(" "); for (String word:splits){ context.write(new Text(word),ONE); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println("--------------------------Maper.cleanup------------------------"); } }
定义了Mapper的splits、setup方法等
MPReduce端编写
package MapReduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MRReduce extends Reducer{ @Override protected void setup(Context context) throws IOException, InterruptedException { System.out.println("--------------------------Reducer.setup------------------------"); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println("--------------------------Reducer.reduce------------------------"); //初始count int count=0; //对于每个key进行聚合操作 for(IntWritable value : values){ count+=value.get();//IntWritable转换为int,用get()方法 } //输出 context.write(key,new IntWritable(count)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println("--------------------------Reducer.setup------------------------"); } }
获取Mapper端相关信息
MRDriver端编写
package MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.rmi.server.ExportException;
public class MRDrive {
public static void main(String[] args) throws Exception {
String input = "data/sqq.txt";
String output = "sqq_znb";
final Configuration configuration = new Configuration();
//1.获取job对象
final Job job = Job.getInstance(configuration);
//删除文件
FileUtils.deleteTarget(configuration,output);
//2.设置class
job.setJarByClass(MRDrive.class);
//3.设置Mapper和Reduce(自己构建的java类)
job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReduce.class);
//设置Mapper阶段输出数据的kv类型,对应就是MRMapper的第三第四个参数的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置Reducer阶段输出数据的kv类型,对应就是MRReduce的第三第四个参数类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入输出的路径
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
//7.提交job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
主函数类
1.获取job对象
2.设置class
3.设置Mapper和Reducer
4.设置Mapper阶段输出数据的kv类型,对应就是MRmapper的第三四个参数的类型
5.设置Reducer阶段输出数据的kv类型,对应就是MRReducer的第三四个参
6.设置输入输出的路径
7.提交job
作业:
1.combiner不适合做avg,为什么?
只有操作满足结合律的才可设置combiner
2.有哪几种压缩方式?分别的优缺点和适用场景。列个表格
a. gzip
优点:压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便
缺点:不支持split
b. lzo
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便
缺点:压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)
c. snappy
优点:压缩速度快;支持hadoop native库
缺点:不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令
d. bzip2
优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便
缺点:压缩/解压速度慢;不支持native
3.yarn参数调优
1.yarn.nodemanager.resource.cpu-vcores
表示该节点服务器上yarn可以使用的虚拟CPU个数,默认是8,推荐将值配置与物理核心个数相同,如果节点CPU核心不足8个,要调小这个值,yarn不会智能的去检测物理核心数
2.yarn.scheduler.minimum-allocation-vcores
单个任务最小可申请的虚拟核心数,默认为1
3.yarn.scheduler.maximum-allocation-vcores
单个任务最大可申请的虚拟核心,默认为4
4打包运行在服务器,可以参考官网的例子
5.map数和reduce数由什么决定?
map个数的决定因素是Split
Split就是数据分成的块
reduce个数的决定因素是分区函数
6.linux上安装mysq1-5.7.11-linux-glibc2.5-x8664.tar.gz



