栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce编程练习

MapReduce编程练习

  1. 目录

    编程实现按日期统计访问次数

    2.编程实现按访问次数排序

    获取成绩表最高分

    编译jar'包方法


  2. 编程实现按日期统计访问次数   

    (1) 定义输入/输出格式 

                社交网站用户的访问日期在格式上属于文本格式,访问次数为整型数值格式。其组成的键值对为<访问日期,访问次数>,因此Mapper的输出与Reducer的输出都选用Text类与IntWritable类。

            (2)Mapper 类的逻辑实现

                Mapper类中最主要的部分就是map函数。map函数的主要任务就是读取用户访问文件中的数据,输出所有访问日期与初始次数的键值对。因此访问日期是数据文件的第二列,所有先定义一个数组,再提取第二个元素,与初始次数1一起构成要输出的键值对,即<访问日期,1>。

            (3)Reducer的逻辑实现

                Reducer类中最主要的部分就是reduce函数。reduce的主要任务就是读取Mapper输出的键值对<访问日期,1>。这一部分与官网给出的WordCount中的Reducer完全相同

  3. 完整代码

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MyKubi {
    	public static class MyMapper
    	extends Mapper{
    		private final static IntWritable one = new IntWritable(1);
    		public void map(Object key, Text value, Context context)throws IOException, InterruptedException{
    			String line = value.toString();
    			String array[] = line.split(",");
    			String keyOutput = array[1];
    			context.write(new Text(keyOutput),one);
    		}
    	}
    	public static class MyReducer
    	extends Reducer{
    	private IntWritable result = new IntWritable();
    	public void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
    		int sum = 0;for (IntWritable val : values ){
    			sum += val.get();
    		}
    		result.set(sum);
    		context.write(key, result);
    	}
    	
    	}
    	public static void main(String[] args) throws Exception{
    		Configuration conf = new Configuration();
    		Job job=Job.getInstance(conf, "Daily Access Count");
    		job.setJarByClass(MyKubi.class);
    		job.setMapperClass(MyMapper.class);
    		job.setReducerClass(MyReducer.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		for (int i = 0; i< args.length - 1; ++i){
    			FileInputFormat.addInputPath(job,  new Path(args[i]));
    		}
    		FileOutputFormat.setOutputPath(job,new Path(args[args.length - 1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1); 
    	}
    }
    
         将文件编译生成jar包上传到集群(生成jar包方法在后面) 
  • 将数据文件上传的虚拟机在上传到hdfs
  • 在jar包的目录下执行  命令 hadoop jar (jar包名) /(数据文件目录) /(输出结果目录)、
  • 2.编程实现按访问次数排序

  • MapReduce只会对键值进行排序,所以我们在Mapper模块中对于输入的键值对,把Key与Value位置互换,在Mapper输出后,键值对经过shuffle的处理,已经变成了按照访问次数排序的数据顺序啦,输出格式为<访问次数,日期>。Reducer的处理和Mapper恰好相反,将键和值的位置互换,输出格式变为<日期,访问次数>。
  • 完整代码
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class AccessTimesSort {
           // Mapper模块
           public static class MyMapper
        extends Mapper{
                 public void map(Object key, Text value, Context context) //map函数的编写要根据读取的文件内容和业务逻辑来写
                               throws IOException, InterruptedException {
                        String line = value.toString();
                        String array[] = line.split("t");//指定,为分隔符,组成数组
                        int keyOutput = Integer.parseInt(array[1]);//提取数组中的访问次数作为Key
                        String valueOutput = array[0]; //将日期作为value
                        context.write(new IntWritable(keyOutput), new  Text(valueOutput));
                 }
           }
           
           // Reducer模块
           public static class MyReducer
        extends Reducer {//注意与上面输出对应
                 public void reduce(IntWritable key, Iterable values, Context  context)  
                               throws IOException, InterruptedException {
                        for (Text val : values) {
                               context.write(val, key);                //进行键值位置互换
                        }
                 }
           }
           
           //Driver模块,主要是配置参数
           public static void main(String[] args) throws Exception {
               Configuration conf = new Configuration();
               
               Job job = Job.getInstance(conf, "AccessTimesSort");
               job.setJarByClass(AccessTimesSort.class);
               job.setMapperClass(MyMapper.class);
               job.setReducerClass(MyReducer.class);
               job.setMapOutputKeyClass(IntWritable.class);
               job.setMapOutputValueClass(Text.class);
               job.setOutputKeyClass(Text.class);
               job.setOutputValueClass(IntWritable.class);
               for (int i = 0; i < args.length - 1; ++i) {
                 FileInputFormat.addInputPath(job, new Path(args[i]));
               }
               FileOutputFormat.setOutputPath(job,
                 new Path(args[args.length - 1]));
               System.exit(job.waitForCompletion(true) ? 0 : 1);
             }    
    }
          
    

    执行方法如上

  • 获取成绩表最高分

  • 在 Mapper 类中,map 函数读取成绩表 A 中的数据,直接将读取的数据以空格分隔,组成键值对<科目,成绩>,即设置输出键值对类型为
    在 Reduce 中,由于 map 函数输出键值对类型是,所以 Reducer 接收的键值对是>。针对相同的键(即科目),遍历比较它的值(即成绩),找出最高值(即最高成绩),最后输出键值对<科目,最高成绩>。
  • 完整代码
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    import java.io.IOException;
    
    public class FindMax {
        public static class FindMaxMapper extends Mapper{
            Text course = new Text();
            IntWritable score = new IntWritable();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String [] values = value.toString().trim().split(" ");
                course.set(values[0]);
                score.set(Integer.parseInt(values[1]));
                context.write(course,score);
    
            }
        }
        public static class FindMaxReducer extends Reducer{
            @Override
            protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
                int maxScore = -1;
                Text course = new Text();
                for(IntWritable score:values){
                    if (score.get()>maxScore){
                        maxScore = score.get();
                        course = key;
                    }
                }
                context.write(course,new IntWritable(maxScore));
            }
        }
        public static void main(String [] args) throws Exception{
    //        if (args.length != 2){
    //            System.out.println("FindMax  ");
    //            System.exit(-1);
    //        }
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf,"findmax");
            job.setJarByClass(FindMax.class);
            job.setMapperClass(FindMaxMapper.class);
            job.setReducerClass(FindMaxReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setNumReduceTasks(1);
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileSystem.get(conf).delete(new Path(args[1]),true);
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            System.out.println(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }
    
    

    编译jar'包方法

  • 右键点击要编译的文件选择Export
  • Java——JARfile
  • 选择classpath project——next

 点击两次next,选择存放路径,选择文件

 

 ********在执行命令是不要忘记将数据文件上传到hdfs************

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/487496.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号