2021SC@SDUSC
本次代码分析将从Hadoop中的Writable,Hadoop中的压缩读取数据(API)以及Hadoop中类Client的相关作用及代码进行分析。
由于Hadoop的Cloud Storage与MapReduce都有通信的需求,需要对通信的对象序列化。而Hadoop没有采用Java的序列化,而是拥有自己的系统。
1.Hadoop中的Writable
而在org.apache.hadoop.io中,hadoop使用自己的序列化格式Writables接口,定义了许多序列化对象,实现了程序的简约,达到了提高运行速度的效果。在Hadoop中, 并没有使用Java自带的基本类型类, 而是使用自己开发的类,包括 IntWritable, FloatWritable, BooleanWritable, LongWritable, ByteWritable, BytesWritable, DoubleWritable, Text等
下面是一个Hadoop中使用Writables接口的一段经典代码:
1. public class MyWritable implements Writable {
2. // Some data
3. private int counter;
4. private long timestamp;
5.
6. public void write(DataOutput out) throws IOException {
7. out.writeInt(counter);
8. out.writeLong(timestamp);
9. }
10.
11. public void readFields(DataInput in) throws IOException {
12. counter = in.readInt();
13. timestamp = in.readLong();
14. }
15. 16. public static MyWritable read(DataInput in) throws IOException {
17. MyWritable w = new MyWritable();
18. w.readFields(in);
19. return w;
20. }
21. }
而在上述代码中,我们可以知道
Writable 接口定义的两个方法 write 和 readFields ,它们的功能是实现了把对象序列化和反序列化,从而使代码中对象的排列更加紧密。Hadoop中的RPC(远程调用框架)所交换的信息采用的都是JAVA中的基本字符串类型,例如String和其他类型的基本数组。这样,ObjectWritable 对象保存了一个可以在 RPC 上传输的对象和对象的类型信息。例如,我们将某个对象作为RPC请求,只需要根据 MyWritable 创建一个ObjectWritable,此时ObjectWritable 往流里会写出以下信息:
对象类名长度+对象类名+串行化结果
(PS:ObjectWritable 是一种多用途的封装,他针对Java 基本类型、字符串、枚举、Writable 、空值或任何一种此类类型的数组, 它使用Hadoop 的RPC 来封送(marshal)和反封送(unmarshall )方法参数和返回类型。例如,如果在一个SequenceFile 中的值有多种类型,就可以将值类型声明为ObjectWritable 并把每个类型封装到一个Objectwritable 中。)
因而我们可以知道,Hadoop 自带一系列有用的Writable 实现,可以满足绝大多数用途,而现有的Hadoop Writable 应用已得到很好的优化,可以对付较为复杂的结构。
在Hadoop中,大量的数据需要存储在磁盘或者内存中,因而要进行压缩。对数据文件进行压缩,可以有效减少存储文件所需的空间,并加快数据在网络上或者到磁盘上的传输速度。在Hadoop中,压缩应用于文件存储、Map阶段到Reduce阶段的数据交换(需要打开相关的选项)等情景。
2.CompressionCodec对流进行压缩和解压缩
使用API来压缩输入读取的数据及输出办法的相关代码:public class StreamCompressor
{
public static void main(String[] args) throws Exception
{
String codecClassname = args[0];
Class> codecClass = Class.forName(codecClassname); // 通过名称找对应的编码/解码器
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
// 通过编码/解码器创建对应的输出流
CompressionOutputStream out = codec.createOutputStream(System.out);
// 压缩
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}
要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream(未压缩的数据将 被写到此),将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行解压缩,则调用 createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,,从而从底层的流中读取未压缩的数据。
3.类Cilent相关分析
基于一个Cilent可能会与多个Server通信,也会与DataNode通信,这样就会使某一个Cilent维护多个连接,为了减少不必要的连接,Client 的做法是拿 ConnectionId来作为 Connection 的 ID。ConnectionId 包括一个 InetSocketAddress(IP 地址+端口号戒主机名+端口号)对象和一个用户信息对象。这说明,同一个用户和同一个 InetSocketAddress 的通信将共享同一个连接。
| Client | ConnectionID |
| Address | InetSocketAddress |
| Ticket | UserGroupInformation |
private Hashtable一个 RPC 调用通过 addCall,把请求加到Connection 里。而Call 使用 Obejct 的 wait 和 notify,把 RPC 上的异步消息交互转成同步调用。 Hadoop的源码具有许多可分析的意义和价值,值得我们认真研究,如ObjectWritable的通用机制 。有兴趣的读者可以继续进行深入研究。calls = new Hashtable ();



