栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

实践常见的RPC框架:gRPC

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

实践常见的RPC框架:gRPC

实践常见的RPC框架:gRPC

文章目录

实践常见的RPC框架:gRPC

一、gRPC介绍二、第一步:gRPC服务的定义(四种RPC服务方法)

(1)简单RPC服务(2)服务端流形式的RPC(3)客户端流形式的RPC(4)双向流的PRC 三、第二步:生成客户端和服务端代码

3.1 方式一:手动执行protoc命令生成服务代码3.2 方式二:通过protobuf-base的代码生成器,直接生成编译后的class文件,不生成源代码3.3 从服务定义文件生成的classes3.4 API的稳定性3.5 高级组件

(1)存根(Stub)层(2)管道(Channel)层(3)传输(Transport)层 四、第三步:创建服务

(1)实现`RouteGuideService`(2)简单的RPC(3)服务端流形式的RPC(4)客户端流形式的RPC(5)双向流形式的RPC 五、第四步:启动服务六、第五步:创建客户端

(1)两种存根(2)创建存根 七、第六步:调用服务方法

(1) 调用简单RPC的方法(2)调用服务端流形式的RPC方法(3)调用客户端流形式的RPC方法(4)调用双向流RPC的方法

一、gRPC介绍

gRPC是现代开源能运行在任意环境上的高性能的远程程序调用框架(RPC)。它能有效的连接数据中心内外的服务,并且插件式的支持负载均衡、跟踪、健康检查和身份校验。它也能应用在分布式计算的最后一英里,将设备、手机应用程序、浏览器连接到后台服务。

简单的服务定义:通过Protocol Buffers定义服务。Protocol Bufffers是一个强大的二进制序列化工具集和语言。

Protocol Buffers语法及使用。

启动迅速并且可扩展:使用一行代码就能安装在运行时环境和开发环境,也能扩展到每秒数百万个RPC框架。

能够跨语言和跨平台的工作:为你在各种语言和平台下的服务自动生成公用的客户端和服务端存根代码。

双向流并且融合了用户检验:双向的流,并且完全融合了基于HTTP/2-based通信的插件式的用户认证。

二、第一步:gRPC服务的定义(四种RPC服务方法)

第一步是用Protocol Buffers定义gRPC服务和方法上使用的request和response类型。

gRPC允许我们定义四种服务方法:

(1)简单RPC服务
  // 一个简单的RPC:一个客户端使用存根向服务端发送一个请求,然后等待响应,就想正常的方法调用。
  rpc GetFeature(Point) returns (Feature) {}
(2)服务端流形式的RPC
  // 一个服务端流形式的RPC: 客户端发送一个请求给服务端,然后获取到一个用户读取一系列返回消息的流。
  // 客户端读取返回消息的流知道没有任何数据。
  // 通过放置关键词 stream 在返回值类型
  rpc ListFeatures(Rectangle) returns (stream Feature) {}
(3)客户端流形式的RPC
  // 一个客户端流形式的RPC:客户端通过流的形式写入一系列消息给服务端。一旦消息写入完成,就会等待服务端读取完它们然后返回。
  // 通过放置关键词 stream 在请求参数类型前面
  rpc RecordRoute(stream Point) returns (RouteSummary) {}
(4)双向流的PRC
  // 一个双向流的PRC:客户端和服务端都通过读写流发送一系列消息
  // 两个流操作是独立的,因此客户端和服务端可以按照它们喜欢的任何方式进行读写操作。
  // 例如:服务端能够等到接受完所有的消息之前写入消息,也可以轮流的读取一个消息然后写入一个消息。
  // 消息的顺序被保存在每个流中
  // 通过在请求参数类型和返回参数类型前放置stream关键词:
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

示例:src/main/proto/route_guite.proto

// 使用proto3语言
syntax = "proto3";

// 指定生成Java代码的包文件
option java_package = "com.hef.demo.grpc.api";
// 指定生成多个Java文件
option java_multiple_files = true;
// 指定生成Java文件的类名
option java_outer_classname = "RouteGuideProto";

// 定义服务
service RouteGuide {
  
  // 一个简单的RPC:一个客户端使用存根向服务端发送一个请求,然后等待响应,就想正常的方法调用。
  rpc GetFeature(Point) returns (Feature) {}

  // 一个服务端流形式的RPC: 客户端发送一个请求给服务端,然后获取到一个用户读取一系列返回消息的流。
  // 客户端读取返回消息的流知道没有任何数据。
  // 通过放置关键词 stream 在返回值类型
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // 一个客户端流形式的RPC:客户端通过流的形式写入一系列消息给服务端。一旦消息写入完成,就会等待服务端读取完它们然后返回。
  // 通过放置关键词 stream 在请求参数类型前面
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // 一个双向流的PRC:客户端和服务端都通过读写流发送一系列消息
  // 两个流操作是独立的,因此客户端和服务端可以按照它们喜欢的任何方式进行读写操作。
  // 例如:服务端能够等到接受完所有的消息之前写入消息,也可以轮流的读取一个消息然后写入一个消息。
  // 消息的顺序被保存在每个流中
  // 通过在请求参数类型和返回参数类型前放置stream关键词:
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}


}

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

message Feature {
  string name = 1;
  Point location = 2;
}

message Rectangle {
  Point lo = 1;
  Point hi = 2;
}

message RouteSummary {
  int32 point_count = 1;
  int32 feature_count = 2;
  int32 distance = 3;
  int32 elapsed_time = 4;
}

message RouteNote {
  Point location = 1;
  string message = 2;
}

// 在RPC中未使用。相反,这个是序列化到磁盘的格式
// 本案例中:读取带有Feature列表的JSON文件数据,创建出Feature数据列表
message FeatureDatabase {
  repeated Feature feature = 1;
}
三、第二步:生成客户端和服务端代码

接下来我们要根据.proto文件中对服务的定义生成gRPC的客户端和服务端代码。我们使用带有特定gRPC Java插件protocol buffer编译器做这件事情。我们使用proto3编译器(proto3同时支持proto2和proto3语法)生成服务代码。

当使用Gradle或Maven的时候,protoc构建插件能作为构建的一部分来产生需要的代码,可以参考grpc-java README。

为项目添加maven依赖:


  io.grpc
  grpc-netty-shaded
  1.44.1
  runtime


  io.grpc
  grpc-protobuf
  1.44.1


  io.grpc
  grpc-stub
  1.44.1

 
  org.apache.tomcat
  annotations-api
  6.0.53
  provided

项目的初始状态:

3.1 方式一:手动执行protoc命令生成服务代码
protoc -I=/Users/lifei/documents/workspace/githubRepositoies/JAVARebuild/projects/action-rpcfx-demo/demo-grpc-api/src/main/proto --java_out=/Users/lifei/documents/workspace/githubRepositoies/JAVARebuild/projects/action-rpcfx-demo/demo-grpc-api/src/main/java route_guide.proto

执行上面的命令后生成的代码结构为:

3.2 方式二:通过protobuf-base的代码生成器,直接生成编译后的class文件,不生成源代码

这种方式需要将.proto文件放在src/main/proto和src/test/proto目录下,并在pom.xml中引入插件:


  
    
      kr.motd.maven
      os-maven-plugin
      1.6.2
    
  
  
    
      org.xolstice.maven.plugins
      protobuf-maven-plugin
      0.6.1
      
        com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}
        grpc-java
        io.grpc:protoc-gen-grpc-java:1.44.1:exe:${os.detected.classifier}
      
      
        
          
            compile
            compile-custom
          
        
      
    
  

然后执行mvn compile,发现在target/classes下生成了编译后的class文件,而src/main/java下并没有源代码:

3.3 从服务定义文件生成的classes

Feature.java、Point.java、Rectangle.java和其他的,它们都包含了填充、序列化和接受请求和响应消息类型的代码。RouteGuideGrpc.java包含的内容有:

一个基于RouteGuid服务的实现,RouteGuideGrpc.RouteGuideImplbase中包含了所有RouteGuid服务中定义的方法;stub类是让客户端和服务端进行通信; 3.4 API的稳定性

带有@Internal注解的APIs,它们是被gRPC库内部使用的,不应该被gRPC的用户使用。带有@ExperimentalApi注解的APIs它们主要是为了改变将来发布的版本,其他项目依赖的库代码不应该使用这些APIs。

推荐使用grpc-java-api-checker(一个Erro Prone插件)在任何依赖gRPC的库代码中检查@ExperimentalApi和@Internal注解的使用。它也能用于在非库代码中检查@Internal的使用或者意外使用了@ExperimentalApi。

3.5 高级组件

对库代码的高层有三个明显的层级:存根(Stub)、管道(Channel)、传输(Transport)。

(1)存根(Stub)层

存根层(Stub)是暴露给广大开发者,提供类型安全的你正在适配的数据绑定(datamodel)、接口定义(IDL)、接口。gRPC带有一个protocol-buffers编译器插件,能通过.proto文件生成存根接口,并且绑定其他的数据模型和接口定义也很容易并且是鼓励的。

(2)管道(Channel)层

管道层是传输(Transport)处理之上的抽象,它用于拦截、装饰,并且它暴漏出比存根层更多的行为给应用。它故意让应用框架的使用者容易使用这一层去解决横切问题,例如日志、监控、身份验证等等。

(3)传输(Transport)层

传输层(Transport)用于完成从线路上放置和提取字节的工作。那个接口刚好足够插入不同的实现,注意传输层的API是被考虑用于gRPC内部,其保证比核心包io.grpc下面的接口代码更弱。

gRPC提供了三种传输的实现:

基于Netty的传输,这种方式是基于Neety的主要传输实现。既可以用于客户端,也可以用于服务端;基于OkHttp的传输,这种方式是基于OkHttp的轻量级传输实现。它主要用于安卓并且仅仅用于客户端;内部进程传输,当服务端是和客户端在同一个进程的时候使用。它用于测试,同时也可以安全的用于生产; 四、第三步:创建服务

为了创建我们自己的RouteGuide服务,有两部分工作要做:

覆盖从我们服务定义产生的基础服务类:在我们的类中去做真正的工作;运行一个gRPC服务去监听来自客户端的请求,并返回服务响应;

下面是自己定义的服务案例:src/main/java/com/hef/demo/grpc/server/RouteGuideService.java。

(1)实现RouteGuideService

RouteGuideService继承RouteGuideGrpc.RouteGuideImplbase,并实现了我们所有的服务方法。

public class RouteGuideService extends RouteGuideGrpc.RouteGuideImplbase{
    private final Collection features;


    public RouteGuideService(Collection features) {
        this.features = features;
    }
    // ......
}
(2)简单的RPC
    
    @Override
    public void getFeature(Point request, StreamObserver responseObserver) {
        responseObserver.onNext(checkFeature(request));
        responseObserver.onCompleted();
    }

    private Feature checkFeature(Point location) {
        for (Feature feature : features) {
            if (feature.getLocation().getLatitude() == location.getLatitude()
                    && feature.getLocation().getLongitude()==location.getLongitude()) {
                return feature;
            }
        }
        return Feature.newBuilder().setName("").setLocation(location).build();
    }

getFeature()方法获取两个参数:

Point:请求参数StreamObserver:一个响应的观察者,它是一个特定的接口为了让服务端去调用传递它的响应

为了返回响应给客户端,并且完成调用:

    就像我们在服务定义中指定的一样,为了返回给客户端我们构造并填充了一个Feature响应对象。在这个例子中,为了做这个我们单独定义了一个私有的checkFeature()方法。我们使用响应观察者的onNext()方法去返回一个Feature。我们使用响应观察者的onCompleted()方法去说明我们已经完成了这次RPC调用。
(3)服务端流形式的RPC
    
    @Override
    public void listFeatures(Rectangle request, StreamObserver responseObserver) {
        int left = Math.min(request.getLo().getLongitude(), request.getHi().getLongitude());
        int right = Math.max(request.getLo().getLongitude(), request.getHi().getLongitude());
        int top = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
        int bottom = Math.min(request.getLo().getLatitude(), request.getHi().getLatitude());

        for (Feature feature : features) {
            if (!RouteGuideUtil.exists(feature)) {
                continue;
            }
            int latitude = feature.getLocation().getLatitude();
            int longitude = feature.getLocation().getLongitude();
            if (longitude>=left && longitude<=right && latitude>=bottom && latitude<=top) {
                responseObserver.onNext(feature);
            }
        }
        responseObserver.onCompleted();
    }

服务端流形式端RPC和简单RPC一样,需要获取两个参数:一个参数代表请求参数,一个对象是响应的观察者。

通过遍历features集合,判断是否在请求的范围内,获取多个Feature对象返回给客户端。并且使用观察者的onNext()方法将它们轮流写入响应观察者中。

最后像我们简单的RPC一样,我们使用响应观察者的onCompleted() 方法告诉gRPC我们完成了写入响应。

(4)客户端流形式的RPC
    
    @Override
    public StreamObserver recordRoute(final StreamObserver responseObserver) {
        return new StreamObserver() {
            int pointCount;
            int featureCount;
            int distance;
            Point previous;
            final long startTime = System.nanoTime();
            @Override
            public void onNext(Point point) {
                pointCount++;
                if (RouteGuideUtil.exists(checkFeature(point))) {
                    featureCount++;
                }
                if (previous!=null) {
                    distance = calcDistance(previous, point);
                }
                previous = point;
            }

            @Override
            public void onError(Throwable t) {
                logger.log(Level.WARNING, "在recordRoute()方法中遇到一个错误", t);
            }

            @Override
            public void onCompleted() {
                long seconds = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
                responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
                        .setFeatureCount(featureCount).setDistance(distance)
                        .setElapsedTime((int)seconds).build());
                responseObserver.onCompleted();
            }
        };
    }

    
    private static int calcDistance(Point start, Point end) {
        // 地球的半径,单位是米
        int r = 6371000;
        // 将角度转化为弧度
        double lat1 = Math.toRadians(RouteGuideUtil.getLatitude(start));
        double lat2 = Math.toRadians(RouteGuideUtil.getLatitude(end));
        double lon1 = Math.toRadians(RouteGuideUtil.getLongitude(start));
        double lon2 = Math.toRadians(RouteGuideUtil.getLongitude(end));
        double deltaLat = lat2 - lat1;
        double deltaLon = lon2 - lon1;
        double a = Math.sin(deltaLat/2) * Math.sin(deltaLat/2)
                + Math.cos(lat1) * Math.cos(lat2) * Math.sin(deltaLon/2) * Math.sin(deltaLon/2);
        double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));

        return (int) (r * c);
    }

就像上面的方法,我们获取一个响应流的观察者参数,同时为客户端返回一个用于写入Point的流观察者。

在方法中,我们示例化一个匿名的流观察者作为返回值,在这个匿名流观察者中:

覆盖了onNext()方法,每当客户端写入Point消息流的时候,我们获取特征等信息;覆盖了onCompleted()方法(当客户端完成写入信息之后调用)去填充构建RouteSummary对象,然后我们调用响应观察者带有RouteSummary对象参数的onNext()方法,再然后从服务端调用onCompleted()方法去完成本次方法调用。 (5)双向流形式的RPC

最后让我们看双向流的RPC调用:

    
    @Override
    public StreamObserver routeChat(StreamObserver responseObserver) {
        return new StreamObserver() {
            @Override
            public void onNext(RouteNote routeNote) {
                List notes = getOrCreateNotes(routeNote.getLocation());
                // 返回一个点之前所有点信息
                for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
                    responseObserver.onNext(prevNote);
                }
                // 添加新的note到列表中
                notes.add(routeNote);
            }

            @Override
            public void onError(Throwable t) {
                logger.log(Level.WARNING, "在routeChat()方法上发生错误", t);
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

    
    private List getOrCreateNotes(Point location) {
        List notes = Collections.synchronizedList(new ArrayList<>());
        List prevNotes = routeNotes.getOrDefault(location, notes);
        return prevNotes!=null?prevNotes:notes;
    }

就像客户端流方法一样,我们获取和返回都是一个流观察者,除此之外,在客户端写入消息到它们消息流的同时我们也通过方法的响应观察者返回值。这里的读写方法与我们“客户端流形式的RPC方法”和“服务端流形式的RPC方法”是一样的。

尽管每一边总是能按照写入顺序读取另一边的消息,但客户端和服务端可以按照任意顺序进行读取和写入——它们的流操作是完全独立的。

五、第四步:启动服务

当我们实现了我们的方法之后,我们也需要启动gRPC服务,以便客户端能够真正的使用这些服务。下面的代码片段展示了如何使用这些服务,完整代码src/main/java/com/hef/demo/grpc/server/RouteGuideServer.java

public class RouteGuideServer {

    private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());

    private final int port;
    private final Server server;

    public RouteGuideServer(int port) {
        this(port, RouteGuideUtil.getDefaultFeaturesFile());
    }

    
    public RouteGuideServer(int port, URL featureFile) {
        this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
    }

    
    public RouteGuideServer(ServerBuilder serverBuilder, int port, Collection features) {
        this.port = port;
        this.server = serverBuilder.addService(new RouteGuideService(features)).build();
    }

    
    public void start() throws IOException {
        server.start();
        logger.info("服务启动, 监听的端口为:" + port);
        Runtime.getRuntime().addShutdownHook(new Thread(()->{
            System.err.println("*** shutting down gRPC server since JVM is shutting down ***");
            try {
                RouteGuideServer.this.stop();
            } catch (InterruptedException e) {
                e.printStackTrace(System.err);
            }
            System.err.println("*** server shut down");
        }));
    }

    
    public void stop() throws InterruptedException {
        if (server!=null) {
            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
        }
    }

    
    private void blockUntilShutdown() throws InterruptedException {
        if (server!=null) {
            server.awaitTermination();
        }
    }


    public static void main(String[] args) throws IOException, InterruptedException {
        RouteGuideServer server = new RouteGuideServer(8980);
        server.start();
        server.blockUntilShutdown();
    }
}

就像你看到的那样,我们创建并启动服务使用ServerBuilder。为了做这样的事情,我们:

指定地址并使用构造器的forPort()方法为客户端请求提供我们想让其监听的端口;创建一个RouteGuideService类的示例将其传递到构造器的addService()方法中;在构造器上调用builder()和start()方法去为我们的服务创建并启动一个RPC服务器; 六、第五步:创建客户端

在这一部分我们将看到为我们的RouteGuide服务创建客户端,可以在RouteGuideClient上面看到完整的代码:

(1)两种存根

为了调用服务方法,我们需求创建一个存根,更确切的说,是两个存根:

一个阻塞(或同步)的存根:这意味着RPC调用将等待服务响应,直到其要么返回一个响应,要么抛出一个异常;一个非阻塞(或异步)的存根:这将创建一个对服务端非阻塞的调用,在那儿响应将以异步的方式返回。只有使用异步存根你才能创建确定类型的流; (2)创建存根

首先我们需要为我们的存根创建一个grpc的管道(channel),指定我们想要创建连接的服务地址和端口:

public class RouteGuideClient {

    private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName());
    private final RouteGuideBlockingStub blockingStub;
    private final RouteGuideStub asyncStub;

    
    public RouteGuideClient(String host, int port) {
        this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
    }

    
    public RouteGuideClient(ManagedChannelBuilder channelBuilder) {
        this(channelBuilder.build());
    }

    
    public RouteGuideClient(Channel channel) {
        this.blockingStub = RouteGuideGrpc.newBlockingStub(channel);
        this.asyncStub = RouteGuideGrpc.newStub(channel);
    }

//    private final ManagedChannel channel;
//    public RouteGuideClient(ManagedChannel channel) {
//        this.channel = channel;
//        this.blockingStub = RouteGuideGrpc.newBlockingStub(channel);
//        this.asyncStub = RouteGuideGrpc.newStub(channel);
//    }
  
}

我们通过ManagedChannelBuilder创建管道(channel),我们再使用管道(channel)通过调用从我们.proto文件生成的RouteGuideGrpc 类的newStub()和newBlockingStub()方法创建存根。

七、第六步:调用服务方法

现在让我们看如何调用服务方法,可以在RouteGuideClient上面看到完整的代码:

    public static void main(String[] args) throws InterruptedException {
        String host = "localhost";
        int port = 8980;

        ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
                .usePlaintext().build();
        try {
            RouteGuideClient client = new RouteGuideClient(channel);
            // 调用简单RPC方法
            client.getFeature(409146138, -746188906);
            // 调用服务端流形式的RPC方法
            client.listFeatures(400000000, -750000000, 420000000, -730000000);
            // 调用客户端流形式的RPC方法
            List features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
            // 从features列表中记录一些随机选择的点
            client.recordRoute(features, 10);

            // 调用双向流RPC方法: 发送并返回一些notes
            CountDownLatch finishLatch = client.routeChat();
            if (!finishLatch.await(1, TimeUnit.MINUTES)) {
                client.warning("routeChat 在一分钟之内 没有完成");
            }
        }finally {
            channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
        }
    }
(1) 调用简单RPC的方法

通过阻塞存根调用简单RPC的getFeature()方法是相当的容易,就像调用本地方法一样:

   
    public void getFeature(int lat, int lon) {
        logger.info(String.format("*** GetFeature: lat=%d lon=%d", lat, lon));
        Point point = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
        Feature feature;
        try {
            feature = blockingStub.getFeature(point);
        }catch (StatusRuntimeException e) {
            warning("RPC 调用出错: {0}", e.getStatus());
            return;
        }

        if (RouteGuideUtil.exists(feature)) {
            info("发现了 feature 通过调用 "{0}" 参数为 {1}, {2}",
                    feature.getName(),
                    RouteGuideUtil.getLatitude(feature.getLocation()),
                    RouteGuideUtil.getLongitude(feature.getLocation()));
        }else {
            info(String.format("在 {0}, {1}上没有发现 feature",
                    RouteGuideUtil.getLatitude(feature.getLocation()),
                    RouteGuideUtil.getLongitude(feature.getLocation())));
        }
    }

    private void info(String msg, Object... params) {
        logger.log(Level.INFO, msg, params);
    }

    private void warning(String msg, Object... params) {
        logger.log(Level.WARNING, msg, params);
    }

我们创建并填充一个protocol buffer 请求对象(在这个例子里是Point),将这个对象传递给存根的getFeature()方法,我们将得到一个Feature 对象。

如果发生错误,这个错误会有一个状态编码,通过StatusRuntimeException异常能捕获到这个状态。

(2)调用服务端流形式的RPC方法

接下来我们看一个服务端流形式的RPC方法调用ListFeatures,其返回一个Feature的流对象:

    
    public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
        info("*** ListFeatures: {0}, {1}, {2}, {3}", lowLat, lowLon, hiLat, hiLon);

        Rectangle rectangle = Rectangle.newBuilder()
                .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
                .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build())
                .build();
        Iterator features;
        try {
            features = blockingStub.listFeatures(rectangle);
            for (int i=1; features.hasNext(); i++) {
                Feature feature = features.next();
                info("Result #{0}: {1}", i, feature);
            }
        }catch (StatusRuntimeException e) {
            warning("RPC调用出错:{0} ", e.getStatus());
        }
    }

就像你看到的那样,它和简单RPC调用非常相似,除了将返回单个的Feature对象替换成Feature的迭代器对象,以便我们能获取到所有的Features。

(3)调用客户端流形式的RPC方法

现在我们看一点稍微复杂一点的东西:客户端流形式RPC方法的调用。在那儿我们发送一个Point的流给服务端,然后得到一个RouteSummary。对于这个方法,我们需要使用异步存根。调用客户端流形式的RPC方法和在服务端创建客户端流形式的RPC方法很相似:

    
    public void recordRoute(List features, int numPoints) throws InterruptedException {
        info("*** RecordRoute");
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver responseObserver = new StreamObserver() {
            @Override
            public void onNext(RouteSummary routeSummary) {
                info("完成了本次旅行,通过了{0}个points, 通过了{1}个features, 行走了{2}米。花费了{3}秒。",
                        routeSummary.getPointCount(),
                        routeSummary.getFeatureCount(),
                        routeSummary.getDistance(),
                        routeSummary.getElapsedTime());
            }

            @Override
            public void onError(Throwable throwable) {
                warning("RecordRoute 出错:{0}", Status.fromThrowable(throwable));
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                info("Finished RecordRoute");
                finishLatch.countDown();
            }
        };

        StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);
        try {
            for (int i = 0; i < numPoints; i++) {
                int index = random.nextInt(features.size());
                Point point = features.get(index).getLocation();
                info("正在参观:{0}, {1}", RouteGuideUtil.getLatitude(point),
                        RouteGuideUtil.getLongitude(point));
                requestObserver.onNext(point);
                // 在发送下一个之前睡一会儿
                Thread.sleep(random.nextInt(1000) + 500);
                if (finishLatch.getCount()==0) {
                    // 在我们完成发送之前,RPC调用完成或者发生错误
                    // 发送更多的请求不会出现错误,但是它们会被抛弃
                    return;
                }
            }
            // 标记请求完成
            requestObserver.onCompleted();

            // 接受异步的请求
            if (!finishLatch.await(1, TimeUnit.MINUTES)) {
                warning("recordRoute 在1分钟之内没有完成。");
            }
        }catch (RuntimeException e) {
            requestObserver.onError(e);
            throw e;
        }
    }

就像你看到的那样,为了调用这个方法我们需要创建一个流观察者。这个流观察者实现了特定的接口,为了能让服务端调用并传入RouteSummary类型的响应。在我们的流观察者中:

我们覆盖了onNext()方法去打印服务端写入一个RouteSummary类型流的返回信息;我们覆盖了onCompleted()方法(被服务端调用)去减少一个CountDownLatch计数,这样我们就能知道是否服务端已经完成了本次调用;

我们然后传递一个流观察者到异步存根的recordRoute()方法中,并得到一个我们自己的请求观察者,便于我们将Point信息发送给服务端。

一旦我们完成了写入points信息,我们就调用请求观察者的onCompleted()方法去告诉grpc我们已经在客户侧完成了写入操作。一旦我们这样做了,我们检查我们的CountDownLatch查看服务端也已经完成。

(4)调用双向流RPC的方法

最后,让我们看一下双向流方法的调用:

    
    public CountDownLatch routeChat() {
        info("*** RouteChar");
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver responseObserver = new StreamObserver() {
            @Override
            public void onNext(RouteNote routeNote) {
                info("在{0} {1} 得到信息 "{2}"",
                        routeNote.getLocation().getLatitude(),
                        routeNote.getLocation().getLongitude(),
                        routeNote.getMessage()
                        );
            }

            @Override
            public void onError(Throwable throwable) {
                warning("routeChat发生错误:{0}", Status.fromThrowable(throwable));
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                info("完成了routeChat 调用");
                finishLatch.countDown();
            }
        };
        StreamObserver requestObserver = asyncStub.routeChat(responseObserver);
        try {
            RouteNote[] requests = {
                    newNote("第一个消息", 0, 0),
                    newNote("第二个消息", 0, 10_000_000),
                    newNote("第三个消息", 10_000_000, 0),
                    newNote("第四个消息", 10_000_000, 10_000_000)
            };
            for (RouteNote request : requests) {
                info("在{0}, {1}上 发送消息 "{2}"",
                        request.getLocation().getLatitude(),
                        request.getLocation().getLongitude(),
                        request.getMessage());
                requestObserver.onNext(request);
            }
        }catch (StatusRuntimeException e) {
            // 取消RPC
            requestObserver.onError(e);
        }
        // 标记所有的请求完成
        requestObserver.onCompleted();
        // 在异步接受的过程中,返回finishLatch
        return finishLatch;
    }

    private RouteNote newNote(String message, int lat, int lon) {
        return RouteNote.newBuilder().setMessage(message)
                .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build())
                .build();
    }

就像我们使用客户端流的案例一样,我们不仅获取一个流观察者,也获取得一个响应流观察者。除了服务端向响应流写入消息的同时我们通过我们方法的请求观察者发送消息。

对于读写的方法和客户端流方法极其相似。尽管两边都能按照它们写入的顺序获取消息,但是客户端和服务端能够以任意的顺序进行读取和写入——这两个流操作是完全独立的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/750425.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号