一、MapJoin
ReduceJoin缺陷:在Reduce端处理过多的表,非常容易造成数据倾斜(在分区时数据划分的不均匀,导致其中一个并行的reduce处理的数据远远大于其他reduce处理的数据),导致运行效率过低。
MapJoin使用的情况:一张表十分小,一张表十分大。
优点:不会造成数据倾斜,造成数据倾斜的地方是在Shuffle分区的位置;没有Shuffle阶段,所以快,因为Shuffle会对数据进行全排序。
如何做:MapJoin将一张小表完全缓存进内存,在内存中用一个索引连接起来,用Map集合给要join的这一行放在key的位置,通过大表的每一行,使用索引找到这张小表。
案例实操:需求同ReduceJoin
public class MJMapper extends Mapper{ private Map pMap = new HashMap (); private Text k = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //读取pd到pMap //开流 URI[] cacheFiles = context.getCacheFiles(); FileSystem fileSystem = FileSystem.get(context.getConfiguration()); FSDataInputStream pd = fileSystem.open(new Path(cacheFiles[0])); //将文件按行处理,读到pMap中 //字节流无法按行处理,想按行处理需要将其转换成字符流 BufferedReader br = new BufferedReader(new InputStreamReader(pd)); String line; while (StringUtils.isNotEmpty(line = br.readLine())){ String[] fields = line.split("t"); pMap.put(fields[0],fields[1]); } IOUtils.closeStream(br); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("t"); //将PID替换 k.set(fields[0] + "t" + pMap.get(fields[1]) + "t" + fields[2]); context.write(k,NullWritable.get()); } } public class MJDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(MJDriver.class); job.setMapperClass(MJMapper.class); job.setNumReduceTasks(0); //将reduce阶段设置为0就不会生成reduce了,map结束之后会直接OutputFormat将数据输出出去 //想要mapper读取数据,需要设置分布式缓存 job.addCacheFile(URI.create(args[0])); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
二、数据清洗和计数器应用
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。但是在数据清洗中被清除的数据不能太多,要注意比例问题(5%以下),如果被清洗的数据太多,说明上一层的数据挖掘框架出了问题。目前使用较多的是用一些图形化界面的应用来完成数据清洗工作。
清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
案例需求:将采集到的日志文件进行数据清洗,将文件里面日志长度小于等于11的日志信息清理出去。
public class ETLMapper extends Mapper{ //设置两个计数器 private Counter pass; private Counter fail; @Override protected void setup(Context context) throws IOException, InterruptedException { //通过组名和计数器名字初始化两个计数器,计数器只能增加不能减少 pass = context.getCounter("ETL","Pass"); fail = context.getCounter("ETL","Fail"); // //也可以通过枚举类进行初始化 // pass = context.getCounter(ETL.PASS); // fail = context.getCounter(ETL.FAIL); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strings = value.toString().split(" "); if(strings.length > 11){ context.write(value,NullWritable.get()); pass.increment(1); }else{ fail.increment(1); } } } public class ETLDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(ETLDriver.class); job.setMapperClass(ETLMapper.class); job.setNumReduceTasks(0); job.setOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } } public enum ETL { PASS,FAIL }
三、从windows向Yarn提交源码
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job实例
Configuration configuration = new Configuration();
//添加几条配置
configuration.set("fs.defaultFS", "hdfs://hadoop101:8020");
configuration.set("mapreduce.framework.name","yarn");
configuration.set("mapreduce.app-submission.cross-platform","true");
configuration.set("yarn.resourcemanager.hostname","hadoop102");
Job job = Job.getInstance(configuration);
//2.设置jar包,分布式程序,很多程序需要执行此命令
// job.setJarByClass(WcDriver.class); //修改jar包路径
job.setJar("F:\develop\workspace_idea\mapreduce001\target\mapreduce001-1.0-SNAPSHOT.jar");
//3.设置Mapper和Reducer
job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);
//4.设置Map和Reducer的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//5.设置输入输出文件
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6.提交job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
修改一些参数:
一般不这么操作,主要是为了调试程序。



