栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

C ++中套接字上的协议缓冲区

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

C ++中套接字上的协议缓冲区

最后我可以使它工作了。我将代码发布在这里,以便人们可以对其进行评论和评论,以及是否有人想要在c
++中实现它,这部分代码可以提供帮助。我的初衷是为了使Protobuf以长度前缀的方式工作,这是一个简陋的代码。我从某个我不记得的站点上获取了客户端服务器的代码,并对其进行了修改以适应protobuf。在这里,服务器首先窥视套接字并获取总包的长度,然后进行实际的套接字读取以读取整个包。可能有无数种方法可以做到,但是为了快速解决,我以这种方式做到了。但是我需要找到一种更好的方法来避免每个数据包2个recv,但是在我的情况下,所有消息的大小都是不同的,所以这是我猜测的唯一方法。

原始文件

  message log_packet {  required fixed64 log_time =1;  required fixed32 log_micro_sec =2;  required fixed32 sequence_no =3;  required fixed32 shm_app_id =4;  required string packet_id =5;  required string log_level=6;  required string log_msg=7;  }

协议缓冲区客户端代码

#include <unistd.h>#include "message.pb.h"#include <iostream>#include <google/protobuf/message.h>#include <google/protobuf/descriptor.h>#include <google/protobuf/io/zero_copy_stream_impl.h>#include <google/protobuf/io/pred_stream.h>#include <google/protobuf/io/zero_copy_stream_impl_lite.h>using namespace google::protobuf::io;using namespace std;int main(int argv, char** argc){log_packet payload ;payload.set_log_time(10);payload.set_log_micro_sec(10);payload.set_sequence_no(1);payload.set_shm_app_id(101);payload.set_packet_id("TST");payload.set_log_level("DEBUG");payload.set_log_msg("What shall we say then");cout<<"size after serilizing is "<<payload.ByteSize()<<endl;int siz = payload.ByteSize()+4;char *pkt = new char [siz];google::protobuf::io::ArrayOutputStream aos(pkt,siz);CodedOutputStream *pred_output = new CodedOutputStream(&aos);pred_output->WriteVarint32(payload.ByteSize());payload.SerializeToCodedStream(pred_output);        int host_port= 1101;        char* host_name="127.0.0.1";        struct sockaddr_in my_addr;        char buffer[1024];        int bytecount;        int buffer_len=0;        int hsock;        int * p_int;        int err;        hsock = socket(AF_INET, SOCK_STREAM, 0);        if(hsock == -1){     printf("Error initializing socket %dn",errno);     goto FINISH;        }        p_int = (int*)malloc(sizeof(int));        *p_int = 1;        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||     (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){     printf("Error setting options %dn",errno);     free(p_int);     goto FINISH;        }        free(p_int);        my_addr.sin_family = AF_INET ;        my_addr.sin_port = htons(host_port);        memset(&(my_addr.sin_zero), 0, 8);        my_addr.sin_addr.s_addr = inet_addr(host_name);        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){     if((err = errno) != EINPROGRESS){  fprintf(stderr, "Error connecting socket %dn", errno);  goto FINISH;     }        }       for (int i =0;i<10000;i++){          for (int j = 0 ;j<10;j++) {     if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {  fprintf(stderr, "Error sending data %dn", errno);  goto FINISH;     }     printf("Sent bytes %dn", bytecount);     usleep(1);         }        }        delete pkt;FINISH:        close(hsock);}

协议缓冲区服务器代码

#include <fcntl.h>#include <string.h>#include <stdlib.h>#include <errno.h>#include <stdio.h>#include <netinet/in.h>#include <resolv.h>#include <sys/socket.h>#include <arpa/inet.h>#include <unistd.h>#include <pthread.h>#include "message.pb.h"#include <iostream>#include <google/protobuf/io/pred_stream.h>#include <google/protobuf/io/zero_copy_stream_impl.h>using namespace std;using namespace google::protobuf::io;void* SocketHandler(void*);int main(int argv, char** argc){        int host_port= 1101;        struct sockaddr_in my_addr;        int hsock;        int * p_int ;        int err;        socklen_t addr_size = 0;        int* csock;        sockaddr_in sadr;        pthread_t thread_id=0;        hsock = socket(AF_INET, SOCK_STREAM, 0);        if(hsock == -1){     printf("Error initializing socket %dn", errno);     goto FINISH;        }        p_int = (int*)malloc(sizeof(int));        *p_int = 1;        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||     (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){     printf("Error setting options %dn", errno);     free(p_int);     goto FINISH;        }        free(p_int);        my_addr.sin_family = AF_INET ;        my_addr.sin_port = htons(host_port);        memset(&(my_addr.sin_zero), 0, 8);        my_addr.sin_addr.s_addr = INADDR_ANY ;        if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){     fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %dn",errno);     goto FINISH;        }        if(listen( hsock, 10) == -1 ){     fprintf(stderr, "Error listening %dn",errno);     goto FINISH;        }        //Now lets do the server stuff        addr_size = sizeof(sockaddr_in);        while(true){     printf("waiting for a connectionn");     csock = (int*)malloc(sizeof(int));     if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){  printf("---------------------nReceived connection from %sn",inet_ntoa(sadr.sin_addr));  pthread_create(&thread_id,0,&SocketHandler, (void*)csock );  pthread_detach(thread_id);     }     else{  fprintf(stderr, "Error accepting %dn", errno);     }        }FINISH:;//oops}google::protobuf::uint32 readHdr(char *buf){  google::protobuf::uint32 size;  google::protobuf::io::ArrayInputStream ais(buf,4);  CodedInputStream pred_input(&ais);  pred_input.ReadVarint32(&size);//Depre the HDR and get the size  cout<<"size of payload is "<<size<<endl;  return size;}void readBody(int csock,google::protobuf::uint32 siz){  int bytecount;  log_packet payload;  char buffer [siz+4];//size of the payload and hdr  //Read the entire buffer including the hdr  if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){     fprintf(stderr, "Error receiving data %dn", errno);        }  cout<<"Second read byte count is "<<bytecount<<endl;  //Assign ArrayInputStream with enough memory   google::protobuf::io::ArrayInputStream ais(buffer,siz+4);  CodedInputStream pred_input(&ais);  //Read an unsigned integer with Varint encoding, truncating to 32 bits.  pred_input.ReadVarint32(&siz);  //After the message's length is read, PushLimit() is used to prevent the CodedInputStream   //from reading beyond that length.Limits are used when parsing length-delimited   //embedded messages  google::protobuf::io::CodedInputStream::Limit msgLimit = pred_input.PushLimit(siz);  //De-Serialize  payload.ParseFromCodedStream(&pred_input);  //once the embedded message has been parsed, PopLimit() is called to undo the limit  pred_input.PopLimit(msgLimit);  //Print the message  cout<<"Message is "<<payload.DebugString();}void* SocketHandler(void* lp){    int *csock = (int*)lp;        char buffer[4];        int bytecount=0;        string output,pl;        log_packet logp;        memset(buffer, '', 4);        while (1) {        //Peek into the socket and get the packet size        if((bytecount = recv(*csock,   buffer,4, MSG_PEEK))== -1){     fprintf(stderr, "Error receiving data %dn", errno);        }else if (bytecount == 0)     break;        cout<<"First read byte count is "<<bytecount<<endl;        readBody(*csock,readHdr(buffer));        }FINISH:        free(csock);    return 0;}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/412352.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号