栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Ratis源码分析----客户端发起请求

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Ratis源码分析----客户端发起请求

0x00 思考

如果要分析客户端发起请求,那么第一件事情就是确定框架中的example、client、server包,很明显example是你要写的demo、client是你引用的jar包,server是Ratis的核心server实现,接下来再想想,Ratis是基于Grpc实现远程通信的,那么必然有三种角色通信请求方、服务、通信接收方;其次一般框架不像我们做demo一样直接引用Grpc的类来通信,可能偏向于用类包装一下。因此可以想到Ratis的客户端请求大概是这么回事:client包会有暴露用户使用的简单易用接口,同时也会使用另一个类包装通信过程,中间层会联合两者(其实大多数框架都是这样设计)

0x01相关类梳理

既然要看实现,那就从头到Grpc把相关类搞清楚。首先我们很容易找到example里面的demo,这里就拿counter这个例子来说。因为这里是cs架构,所以必然先要启动server,然后再启动demo,这里就不看server,启动很简单。我们来看看CounterClient类

public static void main(String[] args)
      throws IOException, InterruptedException {
    ...
    //build the counter cluster client
    RaftClient raftClient = buildClient();
    // concurrently
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for (int i = 0; i < increment; i++) {
      executorService.submit(() ->
          raftClient.io().send(Message.valueOf("INCREMENT")));
    }
    executorService.shutdown();
    executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);

    //send GET command and print the response
    RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
    String response = count.getMessage().getContent().toString(Charset.defaultCharset());
    System.out.println(response);
  }

可以看到demo的启动也很简单,就是创建RaftClient,然后创建一个线程池,利用RaftClient发送命令(可以看到这里数据就是“INCREMENT”,因为他就是想发送一个命令让计数器自增)。

刚才咱们说了一般客户端的设计遵循两层设计对上抽象一层,对底抽象一层,所以RaftClient明显是对上(对用户),所以在RaftClient类中就可能包含中间层对象,我们接着看:

public final class RaftClientImpl implements RaftClient{
  private final ClientId clientId;
  private final RaftClientRpc clientRpc;
  ...
}

可以看到RaftClient是个接口,为啥是个接口?因为对用户来说,本质都是在使用服务,既然是服务,换句话说就是使用方法,再换句话说那就是定义接口就好了。接着看到它的实现类有个属性RaftClientRpc,这里大概就能知道这就是中间层Grpc的实现类了。

public class GrpcClientRpc extends RaftClientRpcWithProxy {
  public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);

  private final ClientId clientId;
  private final int maxMessageSize;

  public GrpcClientRpc(ClientId clientId, RaftProperties properties,
      GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
    super(new PeerProxyMap<>(clientId.toString(),
        p -> new GrpcClientProtocolClient(clientId, p, properties, adminTlsConfig, clientTlsConfig)));
    this.clientId = clientId;
    this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
  }
}

咋一看,好像这个类没啥,但是看看他的构造函数,就知道其实它在这里创建了GrpcClientProtocolClient类,这个类就和我们写的Grpc的demo一样,包含stub,然后用stub发请求。我们看看GrpcClientProtocolClient:

public class GrpcClientProtocolClient implements Closeable {
  public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolClient.class);

  private final Supplier name;
  private final RaftPeer target;
  private final ManagedChannel clientChannel;
  private final ManagedChannel adminChannel;

  private final TimeDuration requestTimeoutDuration;
  private final TimeDuration watchRequestTimeoutDuration;
  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();

  private final RaftClientProtocolServiceStub asyncStub;
  private final AdminProtocolServiceBlockingStub adminBlockingStub;

  private final AtomicReference orderedStreamObservers = new AtomicReference<>();

  private final AtomicReference unorderedStreamObservers = new AtomicReference<>();
}

如果用过Grpc的类来说,看到这个类是不是觉得很亲切,有stub那就可以直接进行rpc请求了,但是这里可以看到有两个stub,为啥,因为这个类代理了两个rpc请求(客户端向服务端发数据命令请求、客户端向服务端发送服务配置请求),而且还有两个AsyncStreamObservers,说明发送命令请求是流式的,这很好理解,命令本来就是不停的发,而admin(也就是和服务配置管理相关的)是一次性的。到这里已经不需要继续看下去了吧。

总结来说:相关类就是顶层RaftClient(面向用户)、中间层GrpcClientRpc(中间调节两者接口)、底层GrpcClientProtocolClient(实现数据发送)

0x02 实现

其实上面的相关类分析之后,实现流程很简单了。

CounterClient:

for (int i = 0; i < increment; i++) {
      executorService.submit(() ->
          raftClient.io().send(Message.valueOf("INCREMENT")));
    }

这里io()返回的是BolckingAPI,其实最终还是要转给中间层GrpcClientRpc

RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
    LOG.debug("{}: send {}", client.getId(), request);
    RaftClientReply reply;
    try {
        //这里就是获取GrpcClientRpc,然后调用其sendRequest
      reply = client.getClientRpc().sendRequest(request);
    } catch (GroupMismatchException gme) {
      throw gme;
    } catch (IOException ioe) {
      client.handleIOException(request, ioe);
      throw ioe;
    }
    LOG.debug("{}: receive {}", client.getId(), reply);
    reply = client.handleLeaderException(request, reply);
    reply = RaftClientImpl.handleRaftException(reply, Function.identity());
    return reply;
  }

接着看GrpcClientRpc:

private CompletableFuture sendRequest(
      RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException {
    final RaftClientRequestProto requestProto =
        toRaftClientRequestProto(request);
    final CompletableFuture replyFuture = new CompletableFuture<>();
    // create a new grpc stream for each non-async call.
    //构建request观察者
    final StreamObserver requestObserver =
        proxy.orderedWithTimeout(new StreamObserver() {
          @Override
          public void onNext(RaftClientReplyProto value) {
            replyFuture.complete(value);
          }

          @Override
          public void onError(Throwable t) {
            replyFuture.completeExceptionally(GrpcUtil.unwrapIOException(t));
          }

          @Override
          public void onCompleted() {
            if (!replyFuture.isDone()) {
              replyFuture.completeExceptionally(
                  new AlreadyClosedException(clientId + ": Stream completed but no reply for request " + request));
            }
          }
        });
    requestObserver.onNext(requestProto);
    requestObserver.onCompleted();

    return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply);
  }

可以看到proxy调用了orderedWithTimeout,传入一个response的观测者,很明显这里的proxy就是GrpcClientProtocolClient,点击进去看看:

StreamObserver orderedWithTimeout(StreamObserver responseHandler) {
    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
        .unordered(responseHandler);
  }

我们发现它就是用stub调用unordered服务,传入一个response的观测者,返回一个request观测者,这个response观测者就是我们刚刚传入的,然后用request观测者进行onNext请求即可。实现到这里也就结束了,不知道各位看官觉得我讲清楚了没,如果觉得难以理解,可以去了解一下Grpc(java api)。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/837512.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号