文章目录
- Hadoop进阶篇
- MapReduce:Hadoop分布式并行计算框架
- 序列化与反序列化
-
- MapReduce当中的排序
- 1. 可排序的 Key
- 2. 排序的种类
- 3. 二次排序
- MapReduce中的Combiner
-
- MapReduce中的GroupingComparator分组详解
- 1. 自定义 WritableComparator
- 2. 需求
- 3. 需求分析
- 4. 代码实现
- 自定义OutputFormat
-
- shuffle中数据压缩
- 1. hadoop 支持的压缩算法比较
- 2. 如何开启压缩
- 计数器与累加器
-
Hadoop进阶篇
MapReduce:Hadoop分布式并行计算框架
序列化与反序列化
- 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
- 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
- Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输,所以,hadoop 自己开发了一套序列化机制(Writable):
- 精简,高效。
- 不用像 java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
- Writable 是 Hadoop 的序列化格式的接口,一个类要支持可序列化,只需实现这个接口即可。
- 另外 Writable 有一个子接口是 WritableComparable,其即可实现序列化,也可以对 Key 进行比较,这里可以通过自定义 key 利用 WritableComparable来实现排序功能。
- 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
- 具体实现 bean对象序列化:
- 1、必须实现 Writable 接口;
- 2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造。
1. 需求
- 现有用户手机上网详单数据,求取每个手机号的上行包个数之和、下行包个数之和,以及上行总流量之和、下行总流量之和。
2. 代码实现
public class FlowBean implements Writable {
private int upPackNum;
private int downPackNum;
private int upPayLoad;
private int downPayLoad;
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
// 调用序列化方法时,要用于类型匹配的write方法,并且要记住序列化顺序
out.writeInt(upPackNum);
out.writeInt(downPackNum);
out.writeInt(upPayLoad);
out.writeInt(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化的顺序要与序列化保持一致,并使用匹配类型的方法
this.upPackNum = in.readInt();
this.downPackNum = in.readInt();
this.upPayLoad = in.readInt();
this.downPayLoad = in.readInt();
}
// setter、getter、toString 省略 ...
}
public class FlowMapper extends Mapper {
private FlowBean flowBean;
private Text text;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.flowBean = new FlowBean();
this.text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] slices = value.toString().split("t");
text.set(slices[1]);
flowBean.setUpPackNum(Integer.parseInt(slices[6]));
flowBean.setDownPackNum(Integer.parseInt(slices[7]));
flowBean.setUpPayLoad(Integer.parseInt(slices[8]));
flowBean.setDownPayLoad(Integer.parseInt(slices[9]));
context.write(text, flowBean);
}
}
public class FlowReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int upPackNum = 0;
int downPackNum = 0;
int upPayLoad = 0;
int downPayLoad = 0;
for (FlowBean value : values) {
upPackNum += value.getUpPackNum();
downPackNum += value.getDownPackNum();
upPayLoad += value.getUpPayLoad();
downPayLoad += value.getDownPayLoad();
}
context.write(key, new Text(upPackNum + "t" + downPackNum + "t" + upPayLoad + "t" + downPayLoad));
}
}
public class FlowMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), FlowMain.class.getSimpleName());
job.setJarByClass(FlowMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(FlowMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setReducerClass(FlowReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new FlowMain(), args);
System.exit(run);
}
}
MapReduce当中的排序
1. 可排序的 Key
- 排序是 MapReduce 框架中最重要的操作之一,MapTask 和 ReduceTask 均会对数据按照 Key 进行排序。
- 该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
- 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
- 对于ReduceTask,它从每个执行完成的MapTask上远程拷贝相应的数据文件
- 如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中;
- 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;
- 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。
- 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
2. 排序的种类
- 部分排序:MapReduce 根据输入记录的键对数据集排序,保证输出的每个文件内部有序。
- 全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个 ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完成丧失了 MapReduce 所提供的并行能力。
- 辅助排序:在 Reduce 端对 Key 进行分组,应用于在接收的 key 和 bean 对象时,想让一个或几个字段相同(全部字段比较不相同)的 key 进入到同一个 reduce 方法时,可以采用分组排序。
- 二次排序:在 MR 编程中,需要先按输入数据的某一列排序,如果相同,再按另一列排序。MR 自带的类型作为 key 无法满足需求,往往需要自定义 JavaBean 作为 Map 输出的Key,JavaBean 中使用 compareTo 方法指定排序规则。
3. 二次排序
- 样本数据如下,每条数据有5个字段,分别是手机号、上行包总个数、下行包总个数、上行总流量、下行总流量
13480253104 3 3 180 180
13502468823 57 102 7335 110349
13560439658 33 24 2034 5892
13600217502 18 138 1080 186852
13602846565 15 12 1938 2910
13660577991 24 9 6960 690
13719199419 4 0 240 0
13726230503 24 27 2481 24681
13760778710 2 2 120 120
13823070001 6 3 360 180
13826544101 4 0 264 0
13922314466 12 12 3008 3720
13925057413 69 63 11058 48243
13926251106 4 0 240 0
13926435656 2 4 132 1512
15013685858 28 27 3659 3538
15920133257 20 20 3156 2936
15989002119 3 3 1938 180
18211575961 15 12 1527 2106
18320173382 21 18 9531 2412
84138413 20 16 4116 1432
- 需求:先对下行包总数升序排序;若相等,再按上行总流量进行降序排序。
- 自定义JavaBean:用于封装数据及定义排序规则
public class FlowSortBean implements WritableComparable {
private String phone;
private Integer upPackNum;
private Integer downPackNum;
private Integer upPayLoad;
private Integer downPayLoad;
@Override
public int compareTo(FlowSortBean o) {
int result = this.downPackNum.compareTo(o.downPackNum);
if (result == 0) {
result = - this.upPayLoad.compareTo(o.upPayLoad);
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeInt(upPackNum);
out.writeInt(downPackNum);
out.writeInt(upPayLoad);
out.writeInt(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
this.phone = in.readUTF();
this.upPackNum = in.readInt();
this.downPackNum = in.readInt();
this.upPayLoad = in.readInt();
this.downPayLoad = in.readInt();
}
// setter、getter、toString 省略...
}
public class FlowSortMapper extends Mapper {
private FlowSortBean flowSortBean;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
flowSortBean = new FlowSortBean();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] slices = value.toString().split("t");
flowSortBean.setPhone(slices[0]);
flowSortBean.setUpPackNum(Integer.parseInt(slices[1]));
flowSortBean.setDownPackNum(Integer.parseInt(slices[2]));
flowSortBean.setUpPayLoad(Integer.parseInt(slices[3]));
flowSortBean.setDownPayLoad(Integer.parseInt(slices[4]));
context.write(flowSortBean, NullWritable.get());
}
}
public class FlowSortReducer extends Reducer {
@Override
protected void reduce(FlowSortBean key, Iterable values, Context context) throws IOException, InterruptedException {
// 经过排序后,直接输出
context.write(key, NullWritable.get());
}
}
public class FlowSortMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "FlowSort");
job.setJarByClass(FlowSortMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(FlowSortMapper.class);
job.setMapOutputKeyClass(FlowSortBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(FlowSortReducer.class);
job.setOutputKeyClass(FlowSortBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new FlowSortMain(), args);
System.exit(run);
}
}
MapReduce中的Combiner
1. 基本介绍
- Combiner 类本质也是 Reducer 聚合,Combiner 类继承 Reducer 类,Combiner 是运行在 Map 端,对 mapTask 内部的结果做聚合。
- 作用:可以减少 mapTask 落盘及向 reduceTask 传输的数据量。
- 是否可以做 map 端的 combine:并非所有的 MR job 都适用 combine,原则是无论使用不使用combine,都不能对最终的结果造成影响,比如下边的例子,就不适用:
Mapper:
3 5 7 => (3+5+7)/2 = 5
2 6 => (2+6)/2 = 4
Reducer
(3+5+7+2+6)/5 = 23/5 不等于 (5+4)/2 = 9/2
2. 需求
- 对之前的 wordCount 单词计数统计的例子,加上Combiner过程,实现map端的数据进行汇总之后,再发送到reduce端,减少数据的网络拷贝。
- 在main方法中加入:
job.setCombinerClass(MyReducer.class);
- 运行程序,观察控制台输出有combiner和没有combiner的区别
MapReduce中的GroupingComparator分组详解
- GroupingComparator 是 MapReduce 当中 Reduce 端决定哪些数据作为一组,调用一次 reduce 的逻辑。
- 默认是 key 相同的K-V对,作为同一组,每组调用一次 reduce 方法。
- 可以自定义 GroupingComparator,实现自定义的分组逻辑。
1. 自定义 WritableComparator
- 继承 WritableComparator 类,并重写 compare 方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
2. 需求
- 现有订单数据如下,需要求取每个订单当中金额最大的商品:
订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 322.8
Order_0000002 Pdt_04 522.4
Order_0000002 Pdt_05 822.4
Order_0000003 Pdt_01 222.8
3. 需求分析
4. 代码实现
public class OrderBean implements WritableComparable {
private String orderId;
private Double price;
@Override
public int compareTo(OrderBean o) {
int result = this.orderId.compareTo(o.orderId);
if (result == 0) {
result = -this.price.compareTo(o.price);
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
// setter、getter、toString 省略
}
public class GroupMapper extends Mapper {
private OrderBean orderBean;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
orderBean = new OrderBean();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] slices = value.toString().split("t");
orderBean.setOrderId(slices[0]);
orderBean.setPrice(Double.valueOf(slices[2]));
context.write(orderBean, NullWritable.get());
}
}
public class GroupPartitioner extends Partitioner {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
// 将订单号相同的分在一个区
return orderBean.getOrderId().hashCode() % numPartitions;
}
}
public class MyGroup extends WritableComparator {
public MyGroup() {
// 分组类,要对OrderBean类型的key进行分组
super(OrderBean.class, true);
}
@Override
public int compare(Object a, Object b) {
OrderBean a1 = (OrderBean) a;
OrderBean b1 = (OrderBean) b;
return a1.getOrderId().compareTo(b1.getOrderId());
}
}
public class GroupReducer extends Reducer {
@Override
protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "group");
job.setJarByClass(GroupMain.class);
//第一步:读取文件,解析成为key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
//第二步:自定义map逻辑
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步:分区
job.setPartitionerClass(GroupPartitioner.class);
//第四步:排序 已经做了
//第五步:规约 combiner 省掉
//第六步:分组 自定义分组逻辑
job.setGroupingComparatorClass(MyGroup.class);
//第七步:设置reduce逻辑
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//第八步:设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
Order_0000001 222.8
Order_0000001 25.8
Order_0000002 822.4
Order_0000002 522.4
Order_0000002 322.8
Order_0000003 222.8
- 拓展:如何求每个组当中的top2的订单金额数据???
int num = 0;
for(NullWritable value: values) {
context.write(key, value);
num++;
if(num == 2)
break;
}
自定义OutputFormat
1. 需求
- 现在有一些订单的评论数据,需求,将订单的好评与其他评论(中评、差评)进行区分开来,将最终的数据分开到不同的文件夹下面去
- 数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评
2. 分析
- 程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。
- 实现要点:
- 在mapreduce中访问外部资源
- 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
3. 代码实现
public class MyOutputFormat extends FileOutputFormat {
static class MyRecordWriter extends RecordWriter {
private FSDataOutputStream goodStream;
private FSDataOutputStream badStream;
public MyRecordWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) {
this.goodStream = goodStream;
this.badStream = badStream;
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
if (key.toString().split("t")[9].equals("0")) {// 好评
goodStream.write(key.toString().getBytes());
goodStream.write("rn".getBytes());
} else { // 中评或差评
badStream.write(key.toString().getBytes());
badStream.write("rn".getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (badStream != null) badStream.close();
if (goodStream != null) goodStream.close();
}
}
@Override
public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Path goodCommentPath = new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/main/resources/output/good.txt");
Path badCommentPath = new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/main/resources/output/bad.txt");
FSDataOutputStream goodOutputStream = fs.create(goodCommentPath);
FSDataOutputStream badOutputStream = fs.create(badCommentPath);
return new MyRecordWriter(goodOutputStream, badOutputStream);
}
}
public class OutputFormatMain extends Configured implements Tool {
static class OutputFormatMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), OutputFormatMain.class.getSimpleName());
job.setJarByClass(OutputFormatMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(OutputFormatMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 使用默认的reduce类的逻辑
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(MyOutputFormat.class);
MyOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(2);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new OutputFormatMain(), args);
System.exit(run);
}
}
shuffle中数据压缩
- shuffle 阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多。
- 从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多
- MapReduce的执行流程:
MapReduce
input
mapper
shuffle
partitioner、sort、combiner、【compress】、group
reducer
output
- 文件压缩有两个好处:节约磁盘空间,加速数据在网络和磁盘上的传输。
- 查看 hadoop 支持的压缩算法:
hadoop checknative
1. hadoop 支持的压缩算法比较
| 压缩格式 | 工具 | 算法 | 文件扩展名 | 是否可切分 | 对应使用的Java类 |
|---|
| DEFLATE | 无 | DEFLATE | .deflate | 否 | org.apache.hadoop.io.compress.DefaultCodec |
| Gzip | gzip | DEFLATE | .gz | 否 | org.apache.hadoop.io.compress.GzipCodec |
| bzip2 | bzip2 | bzip2 | .bz2 | 是 | org.apache.hadoop.io.compress.BZip2Codec |
| LZO | lzop | LZO | .lzo | 否 | com.hadoop.compression.lzo.LzopCodec |
| LZ4 | 无 | LZ4 | .lz4 | 否 | org.apache.hadoop.io.compress.Lz4Codec |
| Snappy | 无 | Snappy | .snappy | 否 | org.apache.hadoop.io.compress.SnappyCodec |
| 压缩算法 | 原始文件大小 | 压缩后的文件大小 | 压缩速率 | 解压缩速度 |
|---|
| gzip | 8.3GB | 1.8GB | 17.5MB/s | 58MB/s |
| bzip2 | 8.3GB | 1.1GB | 2.4MB/s | 9.5MB/s |
| LZO-bset | 8.3GB | 2GB | 4MB/s | 60.6MB/s |
| LZO | 8.3GB | 2.9GB | 135MB/s | 410MB/s |
| Snappy | 8.3GB | 1.8GB | 172MB/s | 409MB/s |
2. 如何开启压缩
Configuration configuration = new Configuration();
// 设置 map 阶段压缩
configuration.set("mapreduce.map.output.compress", "true");
configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
// 设置 reduce 阶段的压缩
configuration.set("mapreduce.output.fileoutputformat.compress", "true");
configuration.set("mapreduce.output.fileoutputformat.compress.type", "RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
- 修改 mapred-site.xml 进行 MapReduce 压缩:所有节点都要修改mapred-site.xml,修改完成之后记得重启集群
mapreduce.map.output.compress
true
mapreduce.map.output.compress.codec
org.apache.hadoop.io.compress.SnappyCodec
mapreduce.output.fileoutputformat.compress
true
mapreduce.output.fileoutputformat.compress.type
RECORD
mapreduce.output.fileoutputformat.compress.codec
org.apache.hadoop.io.compress.SnappyCodec
计数器与累加器
- 计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到map 或reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
1. hadoop 内置计数器
| 计数器 | 对应的Java类 |
|---|
| MapReduce任务计数器 | org.apache.hadoop.mapreduce.TaskCounter |
| 文件系统计数器 | org.apache.hadoop.mapreduce.FileSystemCounter |
| FileInputFormat计数器 | org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
| FileOutputFormat计数器 | org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
| 作业计数器 | org.apache.hadoop.mapreduce.JobCounter |
- 每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图
2. 自定义计数器
- 利用之前排序及序列化的案例,统计 map 端接收到的数据的条数。
- Mapper 中,通过 context 上下文对象可以获取我们的计数器,并进行记录,通过 context 上下文对象,在 map 端使用计数器进行统计:
- Reducer 中,通过 enum 枚举类型来定义计数器,统计 reduce 端数据的输入 key 有多少个,对应的 value 有多少个:
- github 源代码地址:https://github.com/shouwangyw/bigdata/tree/master/hadoop-demo