声明: 1. 本文为我的个人复习总结, 并非那种从零基础开始普及知识 内容详细全面, 言辞官方的文章
2. 由于是个人总结, 所以用最精简的话语来写文章
3. 若有错误不当之处, 请指出
// Driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2.设置jar
job.setJarByClass(WordCountDriver.class);
//3.关联Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置Mapper输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5.设置Reducer输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//6.设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:/data"));
FileOutputFormat.setOutputPath(job, new Path("D:/data/out"));
job.setNumReduceTasks(3);
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
// Mapper
public class WordCountMapper extends Mapper {
// 定义到map外面,复用此对象
private Text word = new Text();
private LongWritable count = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(key);
String line = value.toString();
String[] splits = line.split("\s+");
for (String split : splits) {
word.set(split);
// 用IO流拷贝出去了, 输出的是key-value (word,1)
context.write(word, count);
}
}
}
// Reducer
public class WordCountReducer extends Reducer {
private LongWritable count = new LongWritable(1);
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
count.set(IteratorUtils.toList(values.iterator()).size());
context.write(key, count);
}
}
TopN:
找出各省份常用词汇中前TopN的Word
第一步: 进行WordCount
第二步: 按省份进行分组 求各省份使用的高频词的前Top10, 如下:
分组的TopN:
解决办法: 在Reducer中对values集合排序后取前N名, 维护一个容量为N的大顶堆, 然后输出堆的全部内容
注意: Hadoop为了优化内存, values便利得到的所有value其实全是同一个value,
为了堆中存的value不受后面遍历的value的写操作影响, 应该往堆中放value的复制品
全局TopN:
法1: 就只设置1个Reduce分区
缺点: 只有一个Reduce分区, 并行度太低
法2: 先用第一个MapReduceTask过滤出各个分组的前TopN, 再启动第二个MapReduceTask设置1个Reduce分区 对第一个Task过滤出的数据求 前TopN
// Bean
public class WordCountBean implements Cloneable, Writable {
private String word;
private int count;
public String getWord() {
return word;
}
public int getCount() {
return count;
}
public void setWord(String word) {
this.word = word;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "(" + word + "," + count + ")";
}
@Override
public WordCountBean clone() throws CloneNotSupportedException {
return (WordCountBean) super.clone();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeInt(count);
}
@Override
public void readFields(DataInput in) throws IOException {
word = in.readUTF();
count = in.readInt();
}
}
// Mapper
public class Top10_Mapper extends Mapper {
// 定义到map外面,复用此对象
private Text province = new Text();
private WordCountBean wordCountBean = new WordCountBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(" ");
province.set(split[0]);
wordCountBean.setWord(split[1]);
wordCountBean.setCount(split[2]);
context.write(province, wordCountBean);
}
}
//Reducer
public class Top10_Reducer extends Reducer {
// 定义到map外面,复用此对象
private Text province = new Text();
private WordCountBean wordCountBean = new WordCountBean();
// 小顶堆
PriorityQueue heap = new PriorityQueue<>(new Comparator() {
@Override
public int compare(WordCountBean o1, WordCountBean o2) {
return o1.getCount() - o2.getCount();
}
});
@SneakyThrows
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
heap.clear();
// 这样会造成bean.clone values.size次,太浪费空间
// for (WordCountBean bean : values) {
// heap.add(bean.clone());
// if (heap.size() > 10) {
// heap.poll();
// }
// }
// 优化:
// 复用poll出来的对象
WordCountBean waste = null;
for (WordCountBean bean : values) {
if (temp == null) {
heap.add(bean.clone());
} else {
waste.setWord(bean.getWord());
waste.setCount(bean.getCount());
heap.add(waste);
}
if (heap.size() > 10) {
temp = heap.poll();
}
}
// 逆序输出堆
int size = heap.size();
WordCountBean[] wordCountBeanArr = new WordCountBean[size];
for (int i = 0; i < size; i++) {
wordCountBeanArr[i] = heap.poll();
}
for (int i = size - 1; i >= 0; i--) {
context.write(key, wordCountBeanArr[i]);
}
}
}
Join:
将order表 与 pd表进行join
order表字段:
- idpidamount
pd表字段:
- pidname
public class MapJoinMapper extends MapperReduceJoin:{ private final Map map = new ConcurrentHashMap<>(); @Override protected void setup(Context context) throws IOException { URI pdFile = context.getCacheFiles()[0]; Path path = new Path(pdFile); FSDataInputStream fis = null; BufferedReader reader = null; try { fis = FileSystem.get(context.getConfiguration()).open(path); reader = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8)); String str; while (StringUtils.isNotEmpty(str = reader.readLine())) { String[] split = str.split("\s+"); map.put(split[0], split[1]); } } finally { IOUtils.closeStream(reader); IOUtils.closeStream(fis); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\s+"); context.write(new Text(split[0] + "t" + map.get(split[1]) + "t" + split[2]), NullWritable.get()); } }
// Bean public class OrderPdBean implements WritableComparable{ private Integer id = -1; private String pid = ""; private Integer amount = -1; private String pname = ""; private String flag = ""; @Override public int compareTo(OrderPdBean o) { return this.id - o.id != 0 ? this.id - o.id : this.pname.compareTo(o.pname); } @Override public void write(DataOutput out) throws IOException { out.writeInt(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { id = in.readInt(); pid = in.readUTF(); amount = in.readInt(); pname = in.readUTF(); flag = in.readUTF(); } } // Mapper public class ReduceJoinMapper extends Mapper { private String fileName; private OrderPdBean outV = new OrderPdBean(); @Override protected void setup(Context context) { fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\s+"); if ("order.txt".equals(fileName)) { outV.setFlag("order"); outV.setId(Integer.valueOf(split[0])); outV.setPid(split[1]); outV.setAmount(Integer.valueOf(split[2])); } else if ("pd.txt".equals(fileName)) { outV.setFlag("pd"); outV.setPid(split[0]); outV.setPname(split[1]); } context.write(new Text(outV.getPid()), outV); } } // Reducer public class ReduceJoinReducer extends Reducer { private BeanCopier beanCopier = BeanCopier.create(OrderPdBean.class, OrderPdBean.class, false); private List orderList = new linkedList<>(); private OrderPdBean pdBean = new OrderPdBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { orderList.clear(); for (OrderPdBean orderPdBean : values) { if ("order".equals(orderPdBean.getFlag())) { OrderPdBean orderPdCopy = new OrderPdBean(); beanCopier.copy(orderPdBean, orderPdCopy, null); orderList.add(orderPdCopy); } else { beanCopier.copy(orderPdBean, pdBean, null); } } for (OrderPdBean orderPdBean : orderList) { if ("order".equals(orderPdBean.getFlag())) { orderPdBean.setPname(pdBean.getPname()); } context.write(orderPdBean, NullWritable.get()); } } }



