2021SC@SDUSC
1.Hadoop的Server
在Java中,Server是一个抽象类,在Hadoop中,Server只是提供了一个框架,而具体实现过程,需要这个框架中的具体类来实现。而具体类主要使用的Call方法,接下来就对Call方法进行分析。
public abstract Writable call(Writable param, long receiveTime) throws IOException;我们先来分析 Server.Call,和 Client.Call 类似,Server.Call 包含了一次请求,其中,id 和 param 的含义和 Client.Call 是一致的。而不同点在于,connection 是该 Call 来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection,收送给客户端。属性 timestamp 是请求到达的时间戳,如果请求长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的 response 是请求处理的结果,可能是一个 Writable 的串行化结果,也可能一个异常的串行化结 果。用户可以针对不同的response采取不同的处理措施。 Server.Connection 维护客户端的 socket 连接,读取请求并把请求发送到请求处理线程,接收 处理结果并把结果收送给客户端。 Hadoop 的 Server 采用了 Java 的 NIO,这样的话就需要为每一个 socket 连接建立一个线程,读取 socket 上的数据。在 Server 中,叧需要一个线程,就可以 accept 新的连接请求和读取socket 上的数据,请求处理线程一般有多个,它们都是 Server.Handle 类的实例。它们的 run 方法循环地取出一个 Server.Call,调用 Server.call 方法,搜集结果并串行化,然后将结果放入 Responder 队列中。对于处理完的请求,需要将结果写回去。 2.RPC.Server Server中的重要线程:
Listener线程,当Server处于运行状态时,其负责监听来自客户端的连接,并使用Select模式处理Accept事件。同时,它开启了一个空闲连接(Idle Connection)处理例程,如果有过期的空闲连接,就关闭。这个例程通过一个计时器来实现。
当select操作调用时,它可能会阻塞,这给了其它线程执行的机会。当有accept事件发生,它就会被唤醒以处理全部的事件,处理事件是进行一个doAccept的调用。
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
Reader reader = getReader();
Connection c = connectionManager.register(channel);
key.attach(c); // so closeCurrentConnection can get the object
reader.addConnection(c);
}
}
这里使用了while循环,是由于多个连接可能同时发出申请。这里最关键的是设置了新建立socket为非阻塞,这一点是基于性能的考虑,非阻塞的方式尽可能的读取socket接收缓冲区中的数据。
获取Reader方式:
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
Reader主要逻辑:
当Server还在运行时,Reader线程尽可能多地处理缓冲队列中的连接,注册每一个连接的READ事件,采用select模式来获取连接上有数据接收的通知。当有数据需要接收时,它尽最大可能读取select返回的连接上的数据,以防止Listener线程因为没有运行时间而发生饥饿。(如果Listener线程产生饥饿,Reader会发生堵塞,造成客户端请求超时。)
在Reader的调用过程中,使用ChannelRead方法。
private int channelRead(ReadableByteChannel channel,
ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
if (count > 0) {
rpcMetrics.incrReceivedBytes(count);
}
return count;
}
channelRead会判断数据接收数组buffer中的剩余未读数据,如果大于一个临界值NIO_BUFFER_LIMIT,就采取分片方法来多次地读,以防止jdk对large buffer采取变为direct buffer的措施。
从这些代码中,我了解到,Listener采用select模式处理accept事件,一个客户端在一段时间内一般只建立有限次的连接,而且连接的建立是比较快的,所以单线程足够应付,建立后直接丢给Reader,从而建立新连接。



