写在前面文章已收录到我的Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary
上一篇文章通过写一个WordCount学习了MapReduce的入门操作,那么这篇文章继续通过多一些例子来学习MapReduce。下面介绍几种比较常见的操作:排序,去重,求和,求平均数,TopK查询(查询排名前K名的记录)
排序其实MapReduce会默认对Key进行升序自然排序,这显然是远远不够用的,下面我举个例子,输入的file1内容如下:
1,256 1,12 3,283 4,478 2,1001 2,3600 1,4 5,78 2,33
file2内容如下:
5,10 3,598 4,654 1,741 2,123 3,850 2,11568 1,12574
我们要的结果是根据第一个数字进行排序,如果第一个数字相同,则根据第二个数字排序,怎么玩呢?
首先我们得创建一个自定义的类,里面包括两个字段表示一行里面的第一个值和第二个值,接着实现序列化,反序列化方法,最重要是比较方法。
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class ComparableKey implements WritableComparable{ //一行内容的第一个值 private long firstNum; //第二个值 private long secondNum; public ComparableKey() { } public ComparableKey(long firstNum, long secondNum) { this.firstNum = firstNum; this.secondNum = secondNum; } public long getFirstNum() { return firstNum; } public void setFirstNum(long firstNum) { this.firstNum = firstNum; } public long getSecondNum() { return secondNum; } public void setSecondNum(long secondNum) { this.secondNum = secondNum; } @Override public int compareTo(ComparableKey otherComparableKey) { //如果第一位数相等,则比较第二位数,从小到大排序 if (firstNum == otherComparableKey.getFirstNum()) { //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数 return (int) (secondNum - otherComparableKey.getSecondNum()); } else { //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数 return (int) (firstNum - otherComparableKey.getFirstNum()); } } //序列化 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(firstNum); dataOutput.writeLong(secondNum); } //反序列化 @Override public void readFields(DataInput dataInput) throws IOException { firstNum = dataInput.readLong(); secondNum = dataInput.readLong(); } }
接着写Mapper,输入类型是Text,转换为自定义的ComparableKey类型,会自动调compareTo()方法进行比较排序。
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class NumberSortMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strings = value.toString().split(","); long firstNum = Long.parseLong(strings[0]); long secondNum = Long.parseLong(strings[1]); ComparableKey comparableKey = new ComparableKey(firstNum, secondNum); context.write(comparableKey, NullWritable.get()); } }
Mapper已经做了排序,那么Reduce层就只需要取出来就行了。
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class NumberSortReduce extends Reducer{ @Override protected void reduce(ComparableKey key, Iterable values, Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.getFirstNum()), new LongWritable(key.getSecondNum())); } }
最后再写个Main方法,作为入口:
public class NumberSort {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(NumberSort.class);
job.setMapperClass(NumberSortMapper.class);
job.setReducerClass(NumberSortReduce.class);
job.setMapOutputKeyClass(ComparableKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//指定job 的输入文件所在的目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
接着把实验的文件上传上去hadoop的number_sort文件夹(自己创建的目录)。然后再执行任务,使用命令:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar NumberSort number_sort number_sort_output
执行成功后,效果如下:
1 4 1 12 1 256 1 741 1 12574 2 33 2 123 2 1001 2 3600 2 11568 3 283 3 598 3 850 4 478 4 654 5 10 5 78去重
比如以下的这个文本,单词去重,怎么做呢?
hadoop is good hadoop is so good java is great java and hadoop is very good
其实很简单,因为MapReduce输出的类型就是Map,Map的特性就是Key不能重复,于是乎我们可以把值想要去重的值放入Key,Value设置为NULL就完事了。Mapper步骤如下:
public class DistinctMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text keyOut = new Text(); String[] strings = value.toString().split(" "); for (String str : strings) { keyOut.set(str); context.write(keyOut,NullWritable.get()); } } }
Reduce步骤不需要做其他操作,直接取值即可。
public class DistinctReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
再加个入口Main方法。
public class DistinctMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(DistinctMain.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定job 的输入文件所在的目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
把测试数据上传到hadoop上面。
然后执行命令如下:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar DistinctMain distinct distinct_output
输出结果如下:
and good great hadoop is java so very
去重完成。
求和比如有一道很经典的数学题,对1到100进行求和,如果用笔算很简单,可以用首尾相加法,1加99,2加98…以此类推。但是用MapReduce怎么做呢?
1 2 3 4 ... 98 99 100
我们需要使用cleanup()方法,这个方法是在map方法执行完之后执行,只执行一次,看源码就明白了。
//一般是啥事都不干,子类可以实现该方法做一些自己的事情 protected void cleanup(Mapper.Context context) throws IOException, InterruptedException { } public void run(Mapper .Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { //执行完map方法后,执行cleanup()方法 this.cleanup(context); } }
那么问题就很简单了,Mapper实现代码如下:
public class SumMapper extends Mapper{ private long sum = 0L; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { long val = Long.parseLong(value.toString()); sum += val; } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new LongWritable(sum), NullWritable.get()); } }
Reduce实现代码如下:
public class SumReduce extends Reducer{ private long sum = 0L; @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { sum += key.get(); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new LongWritable(sum), NullWritable.get()); } }
Main方法入口:
public class SumMain {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(SumMain.class);
job.setMapperClass(SumMapper.class);
job.setReducerClass(SumReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
//指定job 的输入文件所在的目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
打包成jar包,上传到服务器,然后把包含1到100文本上传到HDFS,执行命令跑任务:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar SumMain sum_main.txt sum_main_out
输出结果如下:
5050求平均数
求平均数也是很常见的操作,比如有一大堆随机生成的数字,求出平均数:
10 25 22 78 119 88 56 32 29 25
求平均数的思路其实就是总和除以个数,所以Mapper阶段的输出就是
public class AverageMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new LongWritable(Long.parseLong(value.toString())), new IntWritable(1)); } }
第二步Reduce步骤就利用cleanup()计算平均数,计算前先计数,求和,代码如下:
public class AverageReduce extends Reducer{ private long sum = 0L; private long count = 0L; @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { int i = 0; for (IntWritable value : values) { i += value.get(); } sum += (key.get() * i); count += i; } @Override protected void cleanup(Context context) throws IOException, InterruptedException { BigDecimal sumBigDecimal = new BigDecimal(sum); BigDecimal countBigDecimal = new BigDecimal(count); BigDecimal result = sumBigDecimal.divide(countBigDecimal, 2, RoundingMode.HALF_UP); context.write(new Text(result.toString()), NullWritable.get()); } }
入口Main类如下:
public class AverageMain {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(AverageMain.class);
job.setMapperClass(AverageMapper.class);
job.setReducerClass(AverageReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定job 的输入文件所在的目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
接着还是老套路,打包上传jar包和测试用的文件,接着执行命令跑任务:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar AverageMain average_main.txt average_main_out
输出结果如下:
48.40TopK查询
假设下面的文本,是单词以及单词出现的次数,要找出出现次数TOP5的单词,怎么做呢?
c++ 12 redis 45 java 120 Python 50 Javascript 41 GoLang 30 Spring 30 Mybatis 11 Hibernate 6 RabbitMQ 64 Kafka 78 Nacos 46 SpringCloud 32 MySQL 100 UML 12 Seata 22 ZooKeeper 38
这里我们可以借用TreeMap这个集合的特性,put进treeMap之后会默认从小到大自然排序,然后还提供倒序排序的方法descendingMap()。
我写段代码示例一下吧:
public static void main(String[] args) {
TreeMap treeMap = new TreeMap<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
//生成随机数
int num = random.nextInt(100);
//插入到treeMap
treeMap.put(num, num + "");
}
for (Integer num : treeMap.keySet()) {
System.out.println(num);
}
}
//打印结果
0
2
3
6
8
10
11
12
14
15
...
于是乎我们可以开始写代码,先写Mapper类,比较简单,就是按空格分割一下,然后输出到Reduce。
public class TopMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); String word = split[0]; long count = Long.parseLong(split[1]); context.write(new LongWritable(count), new Text(word)); } }
输出到Reduce之后,Reduce这边就需要收集,然后做一些处理,代码如下:
public class TopReduce extends Reducer{ private TreeMap treeMap = new TreeMap<>(); private static final long TOP_K = 5; @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text value : values) { sb.append(value.toString()).append("、"); } //去掉最后一个顿号 sb.deleteCharAt(sb.lastIndexOf("、")); treeMap.put(key.get(), sb.toString()); //如果大于最大长度,则删掉第一个元素,因为第一个元素是最小的 if (treeMap.size() > TOP_K) { treeMap.remove(treeMap.firstKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //倒序 Map navigableMap = treeMap.descendingMap(); //排名 int i = 1; String s; for (Map.Entry entry : navigableMap.entrySet()) { s = "排名第" + i + "位 " + entry.getValue() + "出现次数" + entry.getKey() + "次"; context.write(new Text(s), NullWritable.get()); i++; } } }
最后再整个入口类Main。
public class TopMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(TopMain.class);
job.setMapperClass(TopMapper.class);
job.setReducerClass(TopReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//指定job 的输入文件所在的目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
大功告成,然后打包上服务器,并且把测试用的文件也上传到服务器,接着执行命令跑任务:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar TopMain top_k_main.txt top_k_main_out
输出结果如下:
排名第1位 java出现次数120次 排名第2位 MySQL出现次数100次 排名第3位 Kafka出现次数78次 排名第4位 RabbitMQ出现次数64次 排名第5位 Python出现次数50次总结
这篇文章主要介绍了排序,去重,求和,求平均数,TopK查询的小例子,可以加深一下对MapReduce的理解,这篇文章就讲到这里了,希望对大家有所帮助。
觉得有用就点个赞吧,你的点赞是我创作的最大动力~
我是一个努力让大家记住的程序员。我们下期再见!!!
能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!



