栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop源码分析(三)

Hadoop源码分析(三)

2021SC@SDUSC

1.Hadoop的Server

  在Java中,Server是一个抽象类,在Hadoop中,Server只是提供了一个框架,而具体实现过程,需要这个框架中的具体类来实现。而具体类主要使用的Call方法,接下来就对Call方法进行分析。

public abstract Writable call(Writable param, long receiveTime) throws IOException;
我们先来分析 Server.Call,和 Client.Call 类似,Server.Call 包含了一次请求,其中,id 和 param 的含义和 Client.Call 是一致的。而不同点在于,connection 是该 Call 来自的连接,当然,当请求处理结束时,相应的结果会通过相同的connection,收送给客户端。属性 timestamp 是请求到达的时间戳,如果请求长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的 response 是请求处理的结果,可能是一个 Writable 的串行化结果,也可能一个异常的串行化结 果。用户可以针对不同的response采取不同的处理措施。 Server.Connection 维护客户端的 socket 连接,读取请求并把请求发送到请求处理线程,接收 处理结果并把结果收送给客户端。 Hadoop 的 Server 采用了 Java 的 NIO,这样的话就需要为每一个 socket 连接建立一个线程,读取 socket 上的数据。在 Server 中,叧需要一个线程,就可以 accept 新的连接请求和读取socket 上的数据,请求处理线程一般有多个,它们都是 Server.Handle 类的实例。它们的 run 方法循环地取出一个 Server.Call,调用 Server.call 方法,搜集结果并串行化,然后将结果放入 Responder 队列中。对于处理完的请求,需要将结果写回去。 2.RPC.Server Server中的重要线程:

Listener线程,当Server处于运行状态时,其负责监听来自客户端的连接,并使用Select模式处理Accept事件。同时,它开启了一个空闲连接(Idle Connection)处理例程,如果有过期的空闲连接,就关闭。这个例程通过一个计时器来实现。

当select操作调用时,它可能会阻塞,这给了其它线程执行的机会。当有accept事件发生,它就会被唤醒以处理全部的事件,处理事件是进行一个doAccept的调用。

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {

        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        channel.socket().setKeepAlive(true);
        
        Reader reader = getReader();
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
      }
    }

这里使用了while循环,是由于多个连接可能同时发出申请。这里最关键的是设置了新建立socket为非阻塞,这一点是基于性能的考虑,非阻塞的方式尽可能的读取socket接收缓冲区中的数据。

获取Reader方式:

    Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
    }

Reader主要逻辑:

当Server还在运行时,Reader线程尽可能多地处理缓冲队列中的连接,注册每一个连接的READ事件,采用select模式来获取连接上有数据接收的通知。当有数据需要接收时,它尽最大可能读取select返回的连接上的数据,以防止Listener线程因为没有运行时间而发生饥饿。(如果Listener线程产生饥饿,Reader会发生堵塞,造成客户端请求超时。)

在Reader的调用过程中,使用ChannelRead方法。

private int channelRead(ReadableByteChannel channel, 
                          ByteBuffer buffer) throws IOException {
    
    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
                channel.read(buffer) : channelIO(channel, null, buffer);
    if (count > 0) {
      rpcMetrics.incrReceivedBytes(count);
    }
    return count;
  }

channelRead会判断数据接收数组buffer中的剩余未读数据,如果大于一个临界值NIO_BUFFER_LIMIT,就采取分片方法来多次地读,以防止jdk对large buffer采取变为direct buffer的措施。

从这些代码中,我了解到,Listener采用select模式处理accept事件,一个客户端在一段时间内一般只建立有限次的连接,而且连接的建立是比较快的,所以单线程足够应付,建立后直接丢给Reader,从而建立新连接。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350585.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号