首先可以跑一下grpc官方的例子,网址:
https://grpc.io/docs/languages/cpp/quickstart/
一、下载grpc包并编译
https://github.com/grpc/grpc
在编译grpc之前需要下载protobuf、abseil、re2、zlib等,放到grpc的相关目录下,
然后可以写个脚本编译grpc库,如下:
#!/bin/bash
grpcdir="${pwd}"
cd grpc/camke
mkdir build && cd build
cmake -LH -DCMAKE_INSTALL_PREFIX=$grpcdir/ver ../..
make -j4 && make install
编译之后grpc相关的lib库都已经编好在ver/lib或者lib64下,并且在bin目录下会生成protoc和grpc_cpp_plugin,供后续编译protobuf文件使用。
二、定义protobuf文件,并进行编译
1、定义rsp的Result.proto
syntax = "proto3";
import "google/protobuf/any.proto";
option java_package = "com.aaa.proto.base";
option java_outer_classname="ResultProto";
package base;
message Result {
int32 code = 1;
string msg = 2;
google.protobuf.Any data = 3;
}
2、定义Req服务的 helloworld.proto
syntax = "proto3";
import "Result.proto";
option java_package = "com.aaa.service";
option java_outer_classname = "ServiceProto";
package service;
//服务器接口
service ServiceInterface {
// 服务接口1
rpc Service1 (Req1) returns (base.Result) { }
// 服务接口2
rpc Service2 (Req2) returns (base.Result) { }
}
//====================== 服务接口1 ========================//
message Req1 {
string str = 1;
int32 k= 2;
}
message Rsp1 {
repeated Struct struct = 1; // 信息列表
}
message Struct {
string str = 1;
int32 i = 2;
bool b = 3;
}
//====================== 服务接口2 ========================//
message Req2 {
string str = 1;
int32 k= 2;
}
message Rsp2 {
repeated Struct struct = 1; // 信息列表
}
定义好proto文件后将文件编译:
1、protoc -I --cpp_out=./ /protodir
void Proceed();
private:
//异步服务
ServiceInterface::AsyncService* pCallDataServ;
// 异步服务器通知的生产者消费者队列
ServerCompletionQueue* cq_;
// rpc的上下文,允许调整它的各个方面,例如使用压缩、身份验证以及将元数据发送回客户
ServerContext ctx_;
//标记服务类型
ServiceType sType;
Result result;
ServerAsyncResponseWriter Responder;
Req1 req1;
Rsp1 rsp1;
Req2 req2;
Rsp2 rsp2;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // 当前服务状态
};
// 服务器的主循环
void HandleRpcs(int i);
unique_ptr cq_;
ServiceInterface::AsyncService service;
unique_ptr server_;
vector threadPool;
string servip;
};
源文件 AsyncServer.cc:
#include "AsyncServer.h"
#include "pubCommon.h"
#include
#include "mysqlInterface.h"
using namespace std;
#define MAX_SQL_LENGTH 10000
ServerImpl::~ServerImpl()
{
for(int i = 0; i < 10; ++i)
{
threadPool[i].join();
}
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
void ServerImpl::SetServIp(string ip)
{
servip = ip;
cout << "获取到的ip:" << servip << endl;
}
// 启动服务
void ServerImpl::Run()
{
string server_address = servip + string(":") + to_string(LISTEN_PORT);
ServerBuilder builder;
// 侦听给定的地址
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// 注册 service_ 作为我们与客户机通信的实例,在本例中,它对应于一个异步服务
builder.RegisterService(&service);
// 获取用于与gRPC运行时异步通信的完成队
cq_ = builder.AddCompletionQueue();
// 组装服务
server_ = builder.BuildAndStart();
cout << "Server listening on: " << server_address << endl;
new CallData(&service, cq_.get(), ServerImpl::CallData::ST_Req1);
new CallData(&service, cq_.get(), ServerImpl::CallData::ST_Req2);
for(int i = 0; i < 10; ++i)
{
threadPool.push_back(thread(&ServerImpl::HandleRpcs,this,i));
}
}
ServerImpl::CallData::CallData(ServiceInterface::AsyncService* pService, ServerCompletionQueue* cq, ServiceType stype)
: pCallDataServ(pService), cq_(cq), sType(stype), Responder(&ctx_), status_(CREATE)
{
Proceed();
}
void ServerImpl::CallData::Proceed()
{
if (status_ == CREATE)
{
// 进入初始化时将实例状态转为PROCESS
status_ = PROCESS;
switch(sType)
{
case ServerImpl::CallData::ST_Req1:
pCallDataServ->RequestService1(&ctx_, &req1, &Responder, cq_, cq_, this);
break;
case ServerImpl::CallData::ST_Req2:
pCallDataServ->RequestService2(&ctx_, &ctfQry, &Responder, cq_, cq_, this);
break;
default:
break;
}
}
else if (status_ == PROCESS)
{
new CallData(pCallDataServ, cq_, this->sType);
int code;
string strMsg;
char sqlStr[MAX_SQL_LENGTH];
memset(sqlStr ,0 ,MAX_SQL_LENGTH);
status_ = FINISH;
switch(sType)
{
case ServerImpl::CallData::ST_Req1:
{
//req1.str();
//req1.k();
//业务处理
code = SERVE_SUCCESS;
strMsg = "OK.";
rsp1.set_str(req1.str());
result.mutable_data()->PackFrom(rsp1);
//填写客户端请求响应
result.set_code(code);
result.set_msg(strMsg);
Responder.Finish(result, Status::OK, this); //Finish后不能再添加任何其他操作
break;
}
case ServerImpl::CallData::ST_Req2:
{
//req2.str();//.后面的都要小写
//req2.k();
ResultVector db_result;
int nRet = db_query(sqlStr, db_result);
code = 1;
strMsg = "ok.";
for(int i = 0; i < db_result.size(); i++)
{
::service::Struct *pstruct = rsp2.add_struct();
pstruct->set_str(db_result[i][0]);
pstruct->set_k(atoi(db_result[i][1].c_str()));
}
result.mutable_data()->PackFrom(rsp2);
//填写客户端请求响应
result.set_code(code);
result.set_msg(strMsg);
Responder.Finish(result, Status::OK, this);
break;
}
default:
break;
}
}
else if (status_ == FINISH)
{
delete this;
}
}
// 服务器的主循环
void ServerImpl::HandleRpcs(int i)
{
//cout<< "启动第" << i+1 << "个线程,thread_pool_id:" << threadPool[i].get_id() << endl;
void* tag; // 唯一标识一个请求
bool ok;
while (true)
{
GPR_ASSERT(cq_->Next(&tag, &ok)); //从队列中获取客户端请求
GPR_ASSERT(ok);
static_cast(tag)->Proceed(); //处理请求
//cout<< "在第" << i+1 << "个线程处理,thread_pool_id:" << threadPool[i].get_id() << endl;
}
}
Config.xml文件
xml_config.h文件
#include
#include
xml_config.cc文件 //编译工程需要xml.h和libxml.so
#include "xml_config.h"
void xmlConfig::read_system_cfg(mxml_node_t *root_node)
{
mxml_node_t *childNode;
childNode = mxmlFindElement(root_node, root_node, "system", NULL,NULL,MXML_DESCEND);
if(childNode == NULL)
{
return;
}
servip = mxmlElementGetAttr(childNode, "servip");
}
int xmlConfig::load_config(const char *path)
{
FILE *fp;
mxml_node_t *root_node;
fp = fopen(path, "r");
if (fp == NULL)
{
return -1;
}
root_node = mxmlLoadFile(NULL, fp, MXML_OPAQUE_CALLBACK);
if (root_node == NULL)
{
fclose(fp);
return -2;
}
read_system_cfg(root_node);
mxmlRelease(root_node);
fclose(fp);
return 0;
}
string xmlConfig::Getservip()
{
return servip;
}
main.cc文件
#include
#include "pubLog.h"
#include "AsyncServer.h"
#include "mysqlInterface.h"
#include "xml_config.h"
int main(int argc, char** argv)
{
xmlConfig conf;
conf.load_config(CONFIG_FILE);
string ip = conf.GetservIp();
//mysqlInit(ip);
ServerImpl server;
server.Setip(ip);
server.Run();
return 0;
}
四、客户端程序(以下程序跟上面的服务端程序属于同一个项目,仅做为参考)
#include
#include
#include
#include
#include
#include
#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using helloworld::Greeter;
using helloworld::HelloReply;
using helloworld::HelloRequest;
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr channel)
: stub_(Greeter::NewStub(channel)) {}
// Assembles the client's payload and sends it to the server.
void SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// stub_->PrepareAsyncSayHello() creates an RPC object, returning
// an instance to store in "call" but does not actually start the RPC
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call
// object.
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// Once we're complete, deallocate the call object.
delete call;
}
}
private:
// struct for keeping state and data information
struct AsyncClientCall {
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// Storage for the status of the RPC upon completion.
Status status;
std::unique_ptr> response_reader;
};
// Out of the passed in Channel comes the stub, stored here, our view of the
// server's exposed services.
std::unique_ptr stub_;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
// for (int i = 0; i < 100000; i++) {
// std::string user("world " + std::to_string(i));
// greeter.SayHello(user); // The actual RPC call!
//}
char sendline[4096] = {0};
while(fgets(sendline, 4096, stdin) != NULL)
{
for (int i = 1; i <= 100000; i++)
{
std::string user(sendline + std::to_string(i));
greeter.SayHello(user); // The actual RPC call!
}
memset(sendline,0x00,4096);
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); // blocks forever
return 0;
}