客户端基于poll监听标准输入和socket, 服务端基于poll监听socket请求和socket上传来的信息。
Client#includeServer#include #include #include #include #include #include #include #include #include #include #include const int BUF_SIZE = 1024; void set_pfd(pollfd& pfd, int fd, int event, int revent) { pfd.fd = fd; pfd.events = event; pfd.revents = revent; } int main() { int port; char ip[40]; printf("Input port num and ip address !n"); scanf("%d %s", &port, ip); sockaddr_in server_address; bzero(&server_address, sizeof(server_address)); server_address.sin_family = AF_INET; server_address.sin_port = htons(port); inet_pton(AF_INET, ip, &server_address.sin_addr); int sockfd = socket(AF_INET, SOCK_STREAM, 0); assert(sockfd >= 0); int re = connect(sockfd, (sockaddr*)&server_address, sizeof(server_address)); if (re < 0) { printf("Connect failuren"); close(sockfd); return -1; } pollfd fds[2]; set_pfd(fds[0], 0, POLLIN, 0); set_pfd(fds[1], sockfd, POLLIN | POLLRDHUP, 0); int pipefd[2]; re = pipe(pipefd); assert(re != -1); char* read_buf[BUF_SIZE]; printf("Begin !n"); while (true) { re = poll(fds, 2, -1); if (re < 0) { printf("poll failuren"); break; } if (fds[1].revents & POLLIN) { memset(read_buf, ' ', BUF_SIZE); recv(sockfd, read_buf, BUF_SIZE - 1, 0); printf("recv msg : %sn", read_buf); } else if (fds[1].revents & POLLRDHUP) { printf("Server close the connectionn"); break; } if (fds[0].revents & POLLIN) { // 使用零拷贝方法,将标准输入的信息输入到socket上发送到服务端 splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); } } close(sockfd); return 0; }
#include#include #include #include #include #include #include #include #include #include #include const int BUF_SIZE = 1024; const int MAX_USER_NUM = 10; const int MAX_FDS = 65535; // 现有游客数计数器 int user_count = 0; struct user { sockaddr_in addr; char* write; char read[BUF_SIZE]; }; void set_pfd(pollfd& pfd, int fd, int event, int revent); int setnonblocking(int fd); int main(int argc, char **argv) { if (argc <= 1) { printf("请输入端口号!n"); return -1; } int port = atoi(argv[1]); sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_port = htons(port); address.sin_addr.s_addr = htonl(INADDR_ANY); int sockfd = socket(AF_INET, SOCK_STREAM, 0); assert(sockfd >= 0); int ret = bind(sockfd, (sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(sockfd, MAX_USER_NUM); assert(ret != -1); user* users = new user[MAX_FDS]; pollfd fds[MAX_USER_NUM + 1]; // 初始化pollfd文件描述符 set_pfd(fds[0], sockfd, POLLIN | POLLERR, 0); for (int i = 1; i <= MAX_USER_NUM; ++i) { set_pfd(fds[i], -1, 0, 0); } while (true) { // 阻塞模式 ret = poll(fds, MAX_USER_NUM + 1, -1); if (ret < 0) { printf("poll failuren"); break; } for (int i = 0; i <= MAX_USER_NUM; ++i) { if (fds[i].fd == sockfd && (fds[i].revents & POLLIN)) { // 新的连接建立请求 sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int connfd = accept(sockfd, (sockaddr*)&client_addr, &client_addr_len); if (connfd < 0) { printf("Accept failure, errno is %d", errno); continue; } // 超过最大服务用户数 if (user_count > MAX_USER_NUM) { const char* info = "max connect numn"; printf(info); send(connfd, info, strlen(info), 0); close(connfd); continue; } // 可以建立新的服务 user_count++; users[connfd].addr = client_addr; setnonblocking(connfd); // 设置新连接的用户的connfd文件描述符,监听输入、错误、断开连接事件。 set_pfd(fds[user_count], connfd, POLLIN | POLLERR | POLLRDHUP, 0); printf("新的连接到达,connfd号是%d, 现有%d个用户n", connfd, user_count); } else if (fds[i].revents & POLLIN) { int connfd = fds[i].fd; // 读取信息到相应connfd的read buffer区 memset(users[connfd].read, ' ', BUF_SIZE); ret = recv(connfd, users[connfd].read, BUF_SIZE - 1, 0); printf("Get %d bytes info : %s from %dn", ret, users[connfd].read, connfd); if (ret < 0) { // 读取操作出错,关闭连接 // 关闭方法:将最后一个用户信息置换到当前位置,使用户数和指针i减一 if (errno != EAGAIN) { users[connfd] = users[fds[user_count].fd]; fds[i] = fds[user_count]; i--; user_count--; close(connfd); } } else if (ret == 0) { // 空信息,不必转发 } else { // 接收到了数据,通过其他socket准备写数据 for (int j = 1; j <= user_count; ++j) { if (fds[j].fd == connfd) { // 不必给自己转发 continue; } fds[j].events |= ~POLLIN; fds[j].events |= POLLOUT; users[fds[j].fd].write = users[connfd].read; } } } else if (fds[i].revents & POLLOUT) { int connfd = fds[i].fd; if (!users[connfd].write) { continue; } ret = send(connfd, users[connfd].write, strlen(users[connfd].write), 0); users[connfd].write = NULL; // 重新注册事件 fds[i].events |= ~POLLOUT; fds[i].events |= POLLIN; } else if (fds[i].revents & POLLRDHUP) { // close int connfd = fds[i].fd; close(fds[i].fd); users[connfd] = users[fds[user_count].fd]; fds[i] = fds[user_count]; i--, user_count--; printf("%d leftn", connfd); } else if (fds[i].revents & POLLERR) { printf("Get an error from %d", fds[i].fd); char errors[BUF_SIZE]; memset(errors, ' ', sizeof(errors)); socklen_t len = sizeof(errors); if (getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, errors, &len) < 0) { printf("Get socket option failuren"); } continue; } } } delete[] users; close(sockfd); return 0; } void set_pfd(pollfd& pfd, int fd, int event, int revent) { pfd.fd = fd; pfd.events = event; pfd.revents = revent; } int setnonblocking(int fd) { int oldopt = fcntl(fd, F_GETFL); int newopt = oldopt | O_NONBLOCK; fcntl(fd, F_SETFL, newopt); return oldopt; }



