栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Go语言

C++实现grpc异步服务器

Go语言 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C++实现grpc异步服务器

首先可以跑一下grpc官方的例子,网址:
https://grpc.io/docs/languages/cpp/quickstart/

一、下载grpc包并编译

https://github.com/grpc/grpc
在编译grpc之前需要下载protobuf、abseil、re2、zlib等,放到grpc的相关目录下,
然后可以写个脚本编译grpc库,如下:

#!/bin/bash
grpcdir="${pwd}"

cd grpc/camke
mkdir build && cd build

cmake -LH -DCMAKE_INSTALL_PREFIX=$grpcdir/ver ../..
make -j4 && make install

编译之后grpc相关的lib库都已经编好在ver/lib或者lib64下,并且在bin目录下会生成protoc和grpc_cpp_plugin,供后续编译protobuf文件使用。

二、定义protobuf文件,并进行编译

1、定义rsp的Result.proto

syntax = "proto3";

import "google/protobuf/any.proto";
option java_package = "com.aaa.proto.base";
option java_outer_classname="ResultProto";
package base;

message Result {
    int32 code = 1;
    string msg = 2;
    google.protobuf.Any data = 3;
}

2、定义Req服务的 helloworld.proto

syntax = "proto3";

import "Result.proto";
option java_package = "com.aaa.service";
option java_outer_classname = "ServiceProto";
package service;

//服务器接口
service ServiceInterface {
    // 服务接口1
    rpc Service1 (Req1) returns (base.Result) { }
    // 服务接口2
    rpc Service2 (Req2) returns (base.Result) { }
}

//====================== 服务接口1 ========================//

message Req1 {
    string str = 1;  
    int32  k= 2;  
}

message Rsp1 {
    repeated Struct struct = 1;  // 信息列表
}

message Struct {
    string str = 1; 
    int32    i = 2;
    bool     b = 3; 
}

//====================== 服务接口2 ========================//
message Req2 {
    string str = 1;  
    int32  k= 2;  
}

message Rsp2 {
    repeated Struct struct = 1;  // 信息列表
}


定义好proto文件后将文件编译:
1、protoc -I --cpp_out=./ /protodir void Proceed(); private: //异步服务 ServiceInterface::AsyncService* pCallDataServ; // 异步服务器通知的生产者消费者队列 ServerCompletionQueue* cq_; // rpc的上下文,允许调整它的各个方面,例如使用压缩、身份验证以及将元数据发送回客户 ServerContext ctx_; //标记服务类型 ServiceType sType; Result result; ServerAsyncResponseWriter Responder; Req1 req1; Rsp1 rsp1; Req2 req2; Rsp2 rsp2; enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus status_; // 当前服务状态 }; // 服务器的主循环 void HandleRpcs(int i); unique_ptr cq_; ServiceInterface::AsyncService service; unique_ptr server_; vector threadPool; string servip; };

源文件 AsyncServer.cc:

#include "AsyncServer.h"
#include "pubCommon.h"
#include 
#include "mysqlInterface.h"

using namespace std;


#define MAX_SQL_LENGTH 10000

ServerImpl::~ServerImpl() 
{
    for(int i = 0; i < 10; ++i) 
    {
      threadPool[i].join(); 
    }
  
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
}


void ServerImpl::SetServIp(string ip)
{
    servip = ip;
    cout << "获取到的ip:" << servip << endl;
}


// 启动服务
void ServerImpl::Run() 
{
    string server_address = servip + string(":") + to_string(LISTEN_PORT);

    ServerBuilder builder;

    // 侦听给定的地址
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());

    // 注册 service_ 作为我们与客户机通信的实例,在本例中,它对应于一个异步服务
    builder.RegisterService(&service);

    // 获取用于与gRPC运行时异步通信的完成队
    cq_ = builder.AddCompletionQueue();
    // 组装服务
    server_ = builder.BuildAndStart();
    
    cout << "Server listening on: " << server_address << endl;

    
    new CallData(&service, cq_.get(), ServerImpl::CallData::ST_Req1);
	new CallData(&service, cq_.get(), ServerImpl::CallData::ST_Req2);

    
    for(int i = 0; i < 10; ++i) 
    {
        threadPool.push_back(thread(&ServerImpl::HandleRpcs,this,i));
    }
}


ServerImpl::CallData::CallData(ServiceInterface::AsyncService* pService, ServerCompletionQueue* cq, ServiceType stype)
            : pCallDataServ(pService), cq_(cq), sType(stype), Responder(&ctx_), status_(CREATE) 
{
    Proceed();
}           

void ServerImpl::CallData::Proceed() 
{
  
    if (status_ == CREATE) 
    {
        // 进入初始化时将实例状态转为PROCESS 
        status_ = PROCESS;

        
        switch(sType)
        {
            case ServerImpl::CallData::ST_Req1:
                pCallDataServ->RequestService1(&ctx_, &req1, &Responder, cq_, cq_, this);
                break;
            case ServerImpl::CallData::ST_Req2:
                pCallDataServ->RequestService2(&ctx_, &ctfQry, &Responder, cq_, cq_, this);
                break;
            default:
                break;
        }  
    } 
    else if (status_ == PROCESS) 
    {
        
        new CallData(pCallDataServ, cq_, this->sType);
 
        
        int    code;
        string strMsg;
        char sqlStr[MAX_SQL_LENGTH];
        memset(sqlStr ,0 ,MAX_SQL_LENGTH);
        
        status_ = FINISH; 
        switch(sType)
        {
            case ServerImpl::CallData::ST_Req1:
            {
                //req1.str();
                //req1.k();

                //业务处理
				code = SERVE_SUCCESS;
				strMsg = "OK.";
				
				rsp1.set_str(req1.str());        
				result.mutable_data()->PackFrom(rsp1);
				
				
				
                
                //填写客户端请求响应
                result.set_code(code);
                result.set_msg(strMsg);
                
                Responder.Finish(result, Status::OK, this);  //Finish后不能再添加任何其他操作
                        
                break;
            }
            case ServerImpl::CallData::ST_Req2:
            {
                //req2.str();//.后面的都要小写
                //req2.k();
                
				ResultVector db_result;
				int nRet = db_query(sqlStr, db_result);
				
				code = 1;
				
				strMsg = "ok.";
			   
				for(int i = 0; i < db_result.size(); i++)
				{
					::service::Struct *pstruct = rsp2.add_struct();
					
					pstruct->set_str(db_result[i][0]);
					pstruct->set_k(atoi(db_result[i][1].c_str()));

				} 
				result.mutable_data()->PackFrom(rsp2);
                
                //填写客户端请求响应
                result.set_code(code);
                result.set_msg(strMsg);
                Responder.Finish(result, Status::OK, this);         

                break;
            }
            default:
                break;
        }   
    } 
    else if (status_ == FINISH)
    {
        delete this;
    }
}

// 服务器的主循环
void ServerImpl::HandleRpcs(int i) 
{
    //cout<< "启动第" << i+1 << "个线程,thread_pool_id:" << threadPool[i].get_id() << endl;

    void* tag; // 唯一标识一个请求
    bool ok;
    while (true) 
    {
        
      
        GPR_ASSERT(cq_->Next(&tag, &ok));  //从队列中获取客户端请求
        GPR_ASSERT(ok);
      
        static_cast(tag)->Proceed(); //处理请求
        //cout<< "在第" << i+1 << "个线程处理,thread_pool_id:" << threadPool[i].get_id() << endl;
    }
}

Config.xml文件




    

xml_config.h文件

#include 
#include 
#include 
#include "pubLog.h"
#include "mxml.h"

using namespace std;

#define     CONFIG_FILE	        (char*)("../conf/Config.xml")

class xmlConfig
{
public:
    void read_system_cfg(mxml_node_t *root_node);
    int load_config(const char *path);
    string GetservIp();

private:
    string servip;
};

xml_config.cc文件  //编译工程需要xml.h和libxml.so

#include "xml_config.h"

void xmlConfig::read_system_cfg(mxml_node_t *root_node)
{
    mxml_node_t     *childNode;
    

    childNode = mxmlFindElement(root_node, root_node, "system", NULL,NULL,MXML_DESCEND);
    if(childNode == NULL)
    {
        return;
    }

    servip = mxmlElementGetAttr(childNode, "servip");
}

int xmlConfig::load_config(const char *path)
{
    FILE            *fp;
    mxml_node_t     *root_node;
    
    fp = fopen(path, "r");
    if (fp == NULL)
    {
        return -1;
    }

    root_node = mxmlLoadFile(NULL, fp, MXML_OPAQUE_CALLBACK);
    if (root_node == NULL)
    {
        fclose(fp);
        return -2;
    }

    read_system_cfg(root_node);

    mxmlRelease(root_node);
    fclose(fp);

    return 0;
}

string xmlConfig::Getservip()
{
    return servip;
}

main.cc文件

#include 
#include "pubLog.h"
#include "AsyncServer.h"
#include "mysqlInterface.h"
#include "xml_config.h"

int main(int argc, char** argv) 
{
    xmlConfig conf;
    conf.load_config(CONFIG_FILE);
    string ip = conf.GetservIp();
    
    //mysqlInit(ip);
    
    ServerImpl server;
    server.Setip(ip);
    server.Run();
    
    return 0;
}

四、客户端程序(以下程序跟上面的服务端程序属于同一个项目,仅做为参考)


#include 
#include 
#include 

#include 
#include 
#include 

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif

using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;

class GreeterClient {
 public:
  explicit GreeterClient(std::shared_ptr channel)
      : stub_(Greeter::NewStub(channel)) {}

  // Assembles the client's payload and sends it to the server.
  void SayHello(const std::string& user) {
    // Data we are sending to the server.
    HelloRequest request;
    request.set_name(user);

    // Call object to store rpc data
    AsyncClientCall* call = new AsyncClientCall;

    // stub_->PrepareAsyncSayHello() creates an RPC object, returning
    // an instance to store in "call" but does not actually start the RPC
    // Because we are using the asynchronous API, we need to hold on to
    // the "call" instance in order to get updates on the ongoing RPC.
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);

    // StartCall initiates the RPC call
    call->response_reader->StartCall();

    // Request that, upon completion of the RPC, "reply" be updated with the
    // server's response; "status" with the indication of whether the operation
    // was successful. Tag the request with the memory address of the call
    // object.
    call->response_reader->Finish(&call->reply, &call->status, (void*)call);
  }

  // Loop while listening for completed responses.
  // Prints out the response from the server.
  void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;

    // Block until the next result is available in the completion queue "cq".
    while (cq_.Next(&got_tag, &ok)) {
      // The tag in this example is the memory location of the call object
      AsyncClientCall* call = static_cast(got_tag);

      // Verify that the request was completed successfully. Note that "ok"
      // corresponds solely to the request for updates introduced by Finish().
      GPR_ASSERT(ok);

      if (call->status.ok())
        std::cout << "Greeter received: " << call->reply.message() << std::endl;
      else
        std::cout << "RPC failed" << std::endl;

      // Once we're complete, deallocate the call object.
      delete call;
    }
  }

 private:
  // struct for keeping state and data information
  struct AsyncClientCall {
    // Container for the data we expect from the server.
    HelloReply reply;

    // Context for the client. It could be used to convey extra information to
    // the server and/or tweak certain RPC behaviors.
    ClientContext context;

    // Storage for the status of the RPC upon completion.
    Status status;

    std::unique_ptr> response_reader;
  };

  // Out of the passed in Channel comes the stub, stored here, our view of the
  // server's exposed services.
  std::unique_ptr stub_;

  // The producer-consumer queue we use to communicate asynchronously with the
  // gRPC runtime.
  CompletionQueue cq_;
};

int main(int argc, char** argv) {
  // Instantiate the client. It requires a channel, out of which the actual RPCs
  // are created. This channel models a connection to an endpoint (in this case,
  // localhost at port 50051). We indicate that the channel isn't authenticated
  // (use of InsecureChannelCredentials()).
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));

  // Spawn reader thread that loops indefinitely
  std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

//  for (int i = 0; i < 100000; i++) {
  //  std::string user("world " + std::to_string(i));
  //  greeter.SayHello(user);  // The actual RPC call!
  //}
  char sendline[4096] = {0};
  while(fgets(sendline, 4096, stdin) != NULL) 
  {
      for (int i = 1; i <= 100000; i++) 
      {
          std::string user(sendline + std::to_string(i));
          greeter.SayHello(user);  // The actual RPC call!
      } 
                            
      memset(sendline,0x00,4096);
  }

  std::cout << "Press control-c to quit" << std::endl << std::endl;
  thread_.join();  // blocks forever

  return 0;
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/991237.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号