最后我可以使它工作了。我将代码发布在这里,以便人们可以对其进行评论和评论,以及是否有人想要在c
++中实现它,这部分代码可以提供帮助。我的初衷是为了使Protobuf以长度前缀的方式工作,这是一个简陋的代码。我从某个我不记得的站点上获取了客户端服务器的代码,并对其进行了修改以适应protobuf。在这里,服务器首先窥视套接字并获取总包的长度,然后进行实际的套接字读取以读取整个包。可能有无数种方法可以做到,但是为了快速解决,我以这种方式做到了。但是我需要找到一种更好的方法来避免每个数据包2个recv,但是在我的情况下,所有消息的大小都是不同的,所以这是我猜测的唯一方法。
原始文件
message log_packet { required fixed64 log_time =1; required fixed32 log_micro_sec =2; required fixed32 sequence_no =3; required fixed32 shm_app_id =4; required string packet_id =5; required string log_level=6; required string log_msg=7; }协议缓冲区客户端代码
#include <unistd.h>#include "message.pb.h"#include <iostream>#include <google/protobuf/message.h>#include <google/protobuf/descriptor.h>#include <google/protobuf/io/zero_copy_stream_impl.h>#include <google/protobuf/io/pred_stream.h>#include <google/protobuf/io/zero_copy_stream_impl_lite.h>using namespace google::protobuf::io;using namespace std;int main(int argv, char** argc){log_packet payload ;payload.set_log_time(10);payload.set_log_micro_sec(10);payload.set_sequence_no(1);payload.set_shm_app_id(101);payload.set_packet_id("TST");payload.set_log_level("DEBUG");payload.set_log_msg("What shall we say then");cout<<"size after serilizing is "<<payload.ByteSize()<<endl;int siz = payload.ByteSize()+4;char *pkt = new char [siz];google::protobuf::io::ArrayOutputStream aos(pkt,siz);CodedOutputStream *pred_output = new CodedOutputStream(&aos);pred_output->WriteVarint32(payload.ByteSize());payload.SerializeToCodedStream(pred_output); int host_port= 1101; char* host_name="127.0.0.1"; struct sockaddr_in my_addr; char buffer[1024]; int bytecount; int buffer_len=0; int hsock; int * p_int; int err; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1){ printf("Error initializing socket %dn",errno); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %dn",errno); free(p_int); goto FINISH; } free(p_int); my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = inet_addr(host_name); if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){ if((err = errno) != EINPROGRESS){ fprintf(stderr, "Error connecting socket %dn", errno); goto FINISH; } } for (int i =0;i<10000;i++){ for (int j = 0 ;j<10;j++) { if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) { fprintf(stderr, "Error sending data %dn", errno); goto FINISH; } printf("Sent bytes %dn", bytecount); usleep(1); } } delete pkt;FINISH: close(hsock);}协议缓冲区服务器代码
#include <fcntl.h>#include <string.h>#include <stdlib.h>#include <errno.h>#include <stdio.h>#include <netinet/in.h>#include <resolv.h>#include <sys/socket.h>#include <arpa/inet.h>#include <unistd.h>#include <pthread.h>#include "message.pb.h"#include <iostream>#include <google/protobuf/io/pred_stream.h>#include <google/protobuf/io/zero_copy_stream_impl.h>using namespace std;using namespace google::protobuf::io;void* SocketHandler(void*);int main(int argv, char** argc){ int host_port= 1101; struct sockaddr_in my_addr; int hsock; int * p_int ; int err; socklen_t addr_size = 0; int* csock; sockaddr_in sadr; pthread_t thread_id=0; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1){ printf("Error initializing socket %dn", errno); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %dn", errno); free(p_int); goto FINISH; } free(p_int); my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = INADDR_ANY ; if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){ fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %dn",errno); goto FINISH; } if(listen( hsock, 10) == -1 ){ fprintf(stderr, "Error listening %dn",errno); goto FINISH; } //Now lets do the server stuff addr_size = sizeof(sockaddr_in); while(true){ printf("waiting for a connectionn"); csock = (int*)malloc(sizeof(int)); if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){ printf("---------------------nReceived connection from %sn",inet_ntoa(sadr.sin_addr)); pthread_create(&thread_id,0,&SocketHandler, (void*)csock ); pthread_detach(thread_id); } else{ fprintf(stderr, "Error accepting %dn", errno); } }FINISH:;//oops}google::protobuf::uint32 readHdr(char *buf){ google::protobuf::uint32 size; google::protobuf::io::ArrayInputStream ais(buf,4); CodedInputStream pred_input(&ais); pred_input.ReadVarint32(&size);//Depre the HDR and get the size cout<<"size of payload is "<<size<<endl; return size;}void readBody(int csock,google::protobuf::uint32 siz){ int bytecount; log_packet payload; char buffer [siz+4];//size of the payload and hdr //Read the entire buffer including the hdr if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ fprintf(stderr, "Error receiving data %dn", errno); } cout<<"Second read byte count is "<<bytecount<<endl; //Assign ArrayInputStream with enough memory google::protobuf::io::ArrayInputStream ais(buffer,siz+4); CodedInputStream pred_input(&ais); //Read an unsigned integer with Varint encoding, truncating to 32 bits. pred_input.ReadVarint32(&siz); //After the message's length is read, PushLimit() is used to prevent the CodedInputStream //from reading beyond that length.Limits are used when parsing length-delimited //embedded messages google::protobuf::io::CodedInputStream::Limit msgLimit = pred_input.PushLimit(siz); //De-Serialize payload.ParseFromCodedStream(&pred_input); //once the embedded message has been parsed, PopLimit() is called to undo the limit pred_input.PopLimit(msgLimit); //Print the message cout<<"Message is "<<payload.DebugString();}void* SocketHandler(void* lp){ int *csock = (int*)lp; char buffer[4]; int bytecount=0; string output,pl; log_packet logp; memset(buffer, ' ', 4); while (1) { //Peek into the socket and get the packet size if((bytecount = recv(*csock, buffer,4, MSG_PEEK))== -1){ fprintf(stderr, "Error receiving data %dn", errno); }else if (bytecount == 0) break; cout<<"First read byte count is "<<bytecount<<endl; readBody(*csock,readHdr(buffer)); }FINISH: free(csock); return 0;}


