栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop源码分析(一)

Hadoop源码分析(一)

2021SC@SDUSC

本次代码分析将从Hadoop中的Writable,Hadoop中的压缩读取数据(API)以及Hadoop中类Client的相关作用及代码进行分析。

由于Hadoop的Cloud Storage与MapReduce都有通信的需求,需要对通信的对象序列化。而Hadoop没有采用Java的序列化,而是拥有自己的系统。

1.Hadoop中的Writable

而在org.apache.hadoop.io中,hadoop使用自己的序列化格式Writables接口,定义了许多序列化对象,实现了程序的简约,达到了提高运行速度的效果。在Hadoop中, 并没有使用Java自带的基本类型类, 而是使用自己开发的类,包括 IntWritable, FloatWritable, BooleanWritable, LongWritable, ByteWritable, BytesWritable, DoubleWritable, Text等

下面是一个Hadoop中使用Writables接口的一段经典代码:

1. public class MyWritable implements Writable {
2. // Some data
3. private int counter;
4. private long timestamp;
5.
6. public void write(DataOutput out) throws IOException {
7. out.writeInt(counter);
8. out.writeLong(timestamp);
9. }
10.
11. public void readFields(DataInput in) throws IOException {
12. counter = in.readInt();
13. timestamp = in.readLong();
14. }
15. 16. public static MyWritable read(DataInput in) throws IOException {
17. MyWritable w = new MyWritable();
18. w.readFields(in);
19. return w;
20. }
21. }
而在上述代码中,我们可以知道  Writable 接口定义的两个方法 write 和 readFields ,它们的功能是实现了把对象序列化和反序列化,从而使代码中对象的排列更加紧密。Hadoop中的RPC(远程调用框架)所交换的信息采用的都是JAVA中的基本字符串类型,例如String和其他类型的基本数组。这样,ObjectWritable 对象保存了一个可以在 RPC 上传输的对象和对象的类型信息。例如,我们将某个对象作为RPC请求,只需要根据 MyWritable 创建一个ObjectWritable,此时ObjectWritable 往流里会写出以下信息:    对象类名长度+对象类名+串行化结果 (PS:ObjectWritable 是一种多用途的封装,他针对Java 基本类型、字符串、枚举、Writable 、空值或任何一种此类类型的数组, 它使用Hadoop 的RPC 来封送(marshal)和反封送(unmarshall )方法参数和返回类型。例如,如果在一个SequenceFile 中的值有多种类型,就可以将值类型声明为ObjectWritable 并把每个类型封装到一个Objectwritable 中。) 因而我们可以知道,Hadoop 自带一系列有用的Writable 实现,可以满足绝大多数用途,而现有的Hadoop Writable 应用已得到很好的优化,可以对付较为复杂的结构。 在Hadoop中,大量的数据需要存储在磁盘或者内存中,因而要进行压缩。对数据文件进行压缩,可以有效减少存储文件所需的空间,并加快数据在网络上或者到磁盘上的传输速度。在Hadoop中,压缩应用于文件存储、Map阶段到Reduce阶段的数据交换(需要打开相关的选项)等情景。

2.CompressionCodec对流进行压缩和解压缩

使用API来压缩输入读取的数据及输出办法的相关代码: 
public class StreamCompressor

{

     public static void main(String[] args) throws Exception

     {

          String codecClassname = args[0];

          Class codecClass = Class.forName(codecClassname); // 通过名称找对应的编码/解码器

          Configuration conf = new Configuration();

CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

 // 通过编码/解码器创建对应的输出流

          CompressionOutputStream out = codec.createOutputStream(System.out);

 // 压缩

          IOUtils.copyBytes(System.in, out, 4096, false);

          out.finish();

     }

}

要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream(未压缩的数据将 被写到此),将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行解压缩,则调用 createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,,从而从底层的流中读取未压缩的数据。

3.类Cilent相关分析

基于一个Cilent可能会与多个Server通信,也会与DataNode通信,这样就会使某一个Cilent维护多个连接,为了减少不必要的连接,Client 的做法是拿 ConnectionId来作为 Connection 的 ID。ConnectionId 包括一个 InetSocketAddress(IP 地址+端口号戒主机名+端口号)对象和一个用户信息对象。这说明,同一个用户和同一个 InetSocketAddress 的通信将共享同一个连接。

ClientConnectionID
Address InetSocketAddress
TicketUserGroupInformation

连接被封装在类 Client.Connection 中。所有的 RPC 调用,都是通过 Connection进行通信。一个 RPC 调用,自然有输入参数,输出参数和其他可能的异常,同时,为了区分在同一个 Connection 上的不同调用,每个调用都有唯一的 id,而所有的这些都体现在对象 Client.Call 中。Connection 对象通过一个 Hash 表,维护在这个连接上的所有 Call。
 private Hashtable calls = new Hashtable();
一个 RPC 调用通过 addCall,把请求加到Connection 里。而Call 使用 Obejct 的 wait 和 notify,把 RPC 上的异步消息交互转成同步调用。 Hadoop的源码具有许多可分析的意义和价值,值得我们认真研究,如ObjectWritable的通用机制 。有兴趣的读者可以继续进行深入研究。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/311396.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号