- 前言
- 服务器端源码
- 客户端源码
- 自定义库 helper.c 和 helper.h
- helper.c
- helper.h
- Makefile文件
- 使用
由于作者本人最近在忙期末复习(KPI压力,害),目前暂时不对该代码的各类结构体、函数单独拎出来做分析了,这项工作等到这个月考完试之后在完成!
代码的大部分内容都有比较详细的注释,这一个月内如果有读者有看不懂源码的地方可以在评论区提问,我看到会回复!如有误也欢迎批评指正!
这个基于线程池和epoll反应堆的服务器大概有以下几个特点吧,简要说一下:
- 综合了线程池和epoll反应堆的优点,这个一个月后做详细说明。
- 线程池实现了根据客户端的数目(任务数量)实现自动扩容和瘦身。
- epoll反应堆额外实现了检测沉寂用户的功能,若一个客户端连接在60秒内没有发送任何消息,服务器会主动断开连接。
服务器端源码注意:该源码的线程回调函数只是提供了最基础的大小写转换以测试C/S双端的联通性,可以在线程回调函数do_rw()上做自定义的拓展。
转发必须注明出处!
文件名:server.c
#include#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "helper.h" #define DEFAULT_TIME 1 #define MIN_WAIT_TASK_NUM 2 #define MAX_EVENTS 4096 //epoll监听上限数 #define BUFLEN 4096 #define DEFAULT_THREAD_VARY 5 #define true 1 #define false 0 #define MAXLINE2 4096 #define SERV_PORT2 7777 void recvdata(int fd, int events, void *arg); void senddata(int fd, int events, void *arg); typedef struct { void *(*function)(void *); void *arg; } threadpool_task_t; typedef struct threadpool_t { pthread_mutex_t lock; pthread_mutex_t thread_counter; pthread_cond_t queue_not_full; pthread_cond_t queue_not_empty; pthread_t *threads; pthread_t adjust_tid; threadpool_task_t *task_queue; int min_thr_num; int max_thr_num; int live_thr_num; int busy_thr_num; int wait_exit_thr_num; int queue_front; int queue_rear; int queue_size; int queue_max_size; int shutdown; }threadpool_t; void *threadpool_thread(void *threadpool); int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg); void *adjust_thread(void *threadpool); int is_thread_alive(pthread_t tid); int threadpool_free(threadpool_t *pool); struct myevent_s { int fd; //要监听的文件描述符 int events; //对应的监听事件 void *arg; //泛型参数 void (*call_back)(int fd, int events, void *arg); //回调函数 int status; //是否在监听:1->在红黑树上(监听), 0->不在(不监听) char buf[BUFLEN]; int len; long last_active; //记录每次加入红黑树 g_efd 的时间值 int thread_pos; //记录全局数组 struct s_info ts[256] 的下标供线程使用 threadpool_t *thp; }; struct s_info { struct sockaddr_in cliaddr; int connfd; int fd; int events; // 这三个参数用于将epoll监听红黑树上的回调函数和线程池的回调函数连接起来 void *arg; }; int g_efd; //全局变量, 保存epoll_create返回的文件描述符 struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组. +1-->listen fd struct s_info ts[4096]; //用于记录线程 int k = 0; pthread_mutex_t klock; void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg) { ev->fd = fd; ev->call_back = call_back; ev->events = 0; ev->arg = arg; ev->status = 0; memset(ev->buf, 0, sizeof(ev->buf)); ev->len = 0; ev->last_active = time(NULL); //调用eventset函数的时间 return; } void eventadd(int efd, int events, struct myevent_s *ev) { struct epoll_event epv = {0, {0}}; int op; epv.data.ptr = ev; // epv.events = ev->events = events; //EPOLLIN 或 EPOLLOUT epv.events = EPOLLIN | EPOLLET; if (ev->status == 0) { //已经在红黑树 g_efd 里 op = EPOLL_CTL_ADD; //将其加入红黑树 g_efd, 并将status置1 ev->status = 1; } if (epoll_ctl(efd, op, ev->fd, &epv) < 0) //实际添加/修改 printf("event add failed [fd=%d], events[%d]n", ev->fd, events); else printf("event add OK [fd=%d], op=%d, events[%0X]n", ev->fd, op, events); return ; } void eventdel(int efd, struct myevent_s *ev) { struct epoll_event epv = {0, {0}}; if (ev->status != 1) //不在红黑树上 return ; //epv.data.ptr = ev; epv.data.ptr = NULL; //抹去指针 ev->status = 0; //修改状态 epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv); //从红黑树 efd 上将 ev->fd 摘除 return ; } void acceptconn(int lfd, int events, void *arg) { puts("acception running...n"); struct sockaddr_in cin; socklen_t len = sizeof(cin); int cfd, i; if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) { if (errno != EAGAIN && errno != EINTR) { } printf("%s: accept, %sn", __func__, strerror(errno)); return ; } do { for (i = 0; i < MAX_EVENTS; i++) //从全局数组g_events中找一个空闲元素 if (g_events[i].status == 0) //类似于select中找值为-1的元素 break; //跳出 for if (i == MAX_EVENTS) { printf("%s: max connect limit[%d]n", __func__, MAX_EVENTS); break; //跳出do while(0) 不执行后续代码 } int flag = 0; if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0) { //将cfd也设置为非阻塞 printf("%s: fcntl nonblocking failed, %sn", __func__, strerror(errno)); break; } pthread_mutex_lock(&klock); g_events[i].thread_pos = k; k = (k+1)%4096; pthread_mutex_unlock(&klock); ts[g_events[i].thread_pos].cliaddr = cin; ts[g_events[i].thread_pos].connfd = cfd; eventset(&g_events[i], cfd, recvdata, &g_events[i]); eventadd(g_efd, EPOLLIN, &g_events[i]); //将cfd添加到红黑树g_efd中,监听读事件 //思路:把任务添加函数threadpool_add封装进结构体的回调函数 recvdata 中 } while(0); printf("new connect [%s:%d][time:%ld], pos[%d]n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i); return ; } void *do_rw(void *arg); void recvdata(int fd, int events, void *arg){ struct myevent_s *ev = (struct myevent_s *)arg; int position = ev->thread_pos; ts[position].arg = arg; ts[position].events = events; ts[position].fd = fd; threadpool_t *thp = ev->thp; threadpool_add(thp, do_rw, (void*)&ts[position]); } void *do_rw(void *arg){ puts("do_rw is trigger!n"); int i; struct s_info *ts = (struct s_info *)arg; int fd = ts->fd; int events = ts->events; void *argg = ts->arg; struct myevent_s *ev = (struct myevent_s *)argg; char buf[MAXLINE2]; char str[INET_ADDRSTRLEN]; pthread_detach(pthread_self()); printf("(rw)thread %d is recieving from PORT %dn", (unsigned int)pthread_self(), ntohs((*ts).cliaddr.sin_port)); while (1) { int n = Read(fd, buf, MAXLINE2); if (n == 0) { //因为在socket中的读也是阻塞的 printf("the other side has been closed.n"); break; }else if(n == -1){ continue; } printf("(rw)thread %d received from %s at PORT %dn", (unsigned int)pthread_self(), inet_ntop(AF_INET, &(*ts).cliaddr.sin_addr, str, sizeof(str)), ntohs((*ts).cliaddr.sin_port)); for (i = 0; i < n; i++) buf[i] = toupper(buf[i]); Write(ts->connfd, buf, n); } close(ts->connfd); //做完任务之后要把事件从树上摘下 eventdel(fd, ev); } //threadpool_create(2,4096,4096); threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size) { int i; threadpool_t *pool = NULL; do { if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail"); break; } pool->min_thr_num = min_thr_num; pool->max_thr_num = max_thr_num; pool->busy_thr_num = 0; pool->live_thr_num = min_thr_num; pool->wait_exit_thr_num = 0; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->queue_front = 0; pool->queue_rear = 0; pool->shutdown = false; pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) { printf("malloc threads fail"); break; } memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num); pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size); if (pool->task_queue == NULL) { printf("malloc task_queue fail"); break; } if (pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0) { printf("init the lock or cond fail"); break; } for (i = 0; i < min_thr_num; i++) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); printf("start thread %d...n", (unsigned int)pool->threads[i]); } pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool); return pool; } while (0); threadpool_free(pool); return NULL; } //threadpool_add(thp, process, (void*)&num[i]); int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) { pthread_mutex_lock(&(pool->lock)); while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) { pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); } if (pool->shutdown) { pthread_cond_broadcast(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; } if (pool->task_queue[pool->queue_rear].arg != NULL) { pool->task_queue[pool->queue_rear].arg = NULL; } pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; pool->queue_size++; pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; } void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; while (true) { pthread_mutex_lock(&(pool->lock)); while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread %d is waitingn", (unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); if (pool->wait_exit_thr_num > 0) { pool->wait_exit_thr_num--; if (pool->live_thr_num > pool->min_thr_num) { printf("thread %d is exitingn", (unsigned int)pthread_self()); pool->live_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } } } if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); // printf("thread 0x%x is exitingn", (unsigned int)pthread_self()); pthread_detach(pthread_self()); pthread_exit(NULL); } task.function = pool->task_queue[pool->queue_front].function; //取回调函数 task.arg = pool->task_queue[pool->queue_front].arg; //取回调函数的参数 pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; pool->queue_size--; printf("thread %d toke one task, new task can be added!n", (unsigned int)pthread_self()); pthread_cond_broadcast(&(pool->queue_not_full)); pthread_mutex_unlock(&(pool->lock)); printf("thread %d start workingn", (unsigned int)pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num++; pthread_mutex_unlock(&(pool->thread_counter)); (*(task.function))(task.arg); //task.function(task.arg); printf("thread %d end workingn", (unsigned int)pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num--; pthread_mutex_unlock(&(pool->thread_counter)); } pthread_exit(NULL); } void *adjust_thread(void *threadpool) { int i; threadpool_t *pool = (threadpool_t *)threadpool; while (!pool->shutdown) { sleep(1); pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; int live_thr_num = pool->live_thr_num; pthread_mutex_unlock(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) { pthread_mutex_lock(&(pool->lock)); int add = 0; for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++) { if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); add++; pool->live_thr_num++; } } pthread_mutex_unlock(&(pool->lock)); } if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) { pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; pthread_mutex_unlock(&(pool->lock)); for (i = 0; i < DEFAULT_THREAD_VARY; i++) { pthread_cond_signal(&(pool->queue_not_empty)); } } } return NULL; } int threadpool_destroy(threadpool_t *pool) { int i; if (pool == NULL) { return -1; } pool->shutdown = true; pthread_join(pool->adjust_tid, NULL); for (i = 0; i < pool->live_thr_num; i++) { pthread_cond_broadcast(&(pool->queue_not_empty)); } for (i = 0; i < pool->live_thr_num; i++) { pthread_join(pool->threads[i], NULL); } threadpool_free(pool); return 0; } int threadpool_free(threadpool_t *pool) { if (pool == NULL) { return -1; } if (pool->task_queue) { free(pool->task_queue); } if (pool->threads) { free(pool->threads); pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0; } int threadpool_all_threadnum(threadpool_t *pool) { int all_threadnum = -1; // 总线程数 pthread_mutex_lock(&(pool->lock)); all_threadnum = pool->live_thr_num; // 存活线程数 pthread_mutex_unlock(&(pool->lock)); return all_threadnum; } int threadpool_busy_threadnum(threadpool_t *pool) { int busy_threadnum = -1; // 忙线程数 pthread_mutex_lock(&(pool->thread_counter)); busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); return busy_threadnum; } int is_thread_alive(pthread_t tid) { int kill_rc = pthread_kill(tid, 0); //发0号信号,测试线程是否存活 if (kill_rc == ESRCH) { return false; } return true; } int main(void){ pthread_mutex_init(&klock,NULL); threadpool_t *thp = threadpool_create(2,4096,4096); int j = 0; for(; j = 60) { close(g_events[checkpos].fd); //关闭与该客户端链接 printf("[fd=%d] timeoutn", g_events[checkpos].fd); eventdel(g_efd, &g_events[checkpos]); //将该客户端 从红黑树 g_efd移除 } } int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, -1); if (nfd < 0) { printf("epoll_wait error, exitn"); break; } for (i = 0; i < nfd; i++) { struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr; if (events[i].events & (EPOLLIN | EPOLLET)) { //读就绪事件 if(events[i].data.fd == listenfd) continue; ev->call_back(ev->fd, events[i].events, ev->arg); } } } puts("about exit in 100 Seconds..."); sleep(100); threadpool_destroy(thp); return 0; }
客户端源码
文件名:client.c
#include#include #include #include #include #include #include "helper.h" #define MAXLINE2 4096 #define SERV_PORT2 7777 int main(int argc, char *argv[]) { struct sockaddr_in servaddr; char buf[MAXLINE2]; int sockfd, n; sockfd = Socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT2); if(Connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr))==-1){ printf("connection failedn"); } while (fgets(buf, MAXLINE2, stdin) != NULL) { Write(sockfd, buf, strlen(buf)); n = Read(sockfd, buf, MAXLINE2); if (n == 0) printf("the other side has been closed.n"); else Write(STDOUT_FILENO, buf, n); } Close(sockfd); return 0; }
自定义库 helper.c 和 helper.h helper.c
#includehelper.h#include #include #include #include void perr_exit(const char *s) { perror(s); exit(1); } int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n; again: if ( (n = accept(fd, sa, salenptr)) < 0) { if ((errno == ECONNABORTED) || (errno == EINTR)) goto again; else perr_exit("accept error"); } return n; } int Bind(int fd, const struct sockaddr *sa, socklen_t salen) { int n; if ((n = bind(fd, sa, salen)) < 0) perr_exit("bind error"); return n; } int Connect(int fd, const struct sockaddr *sa, socklen_t salen) { int n; if ((n = connect(fd, sa, salen)) < 0) perr_exit("connect error"); return n; } int Listen(int fd, int backlog) { int n; if ((n = listen(fd, backlog)) < 0) perr_exit("listen error"); return n; } int Socket(int family, int type, int protocol) { int n; if ( (n = socket(family, type, protocol)) < 0) perr_exit("socket error"); return n; } ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n; again: if ( (n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; } int Close(int fd) { int n; if ((n = close(fd)) == -1) perr_exit("close error"); return n; } ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; else return -1; } else if (nread == 0) break; nleft -= nread; ptr += nread; } return n - nleft; } ssize_t Writen(int fd, const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr; ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; } nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) { static int read_cnt; static char *read_ptr; static char read_buf[100]; if (read_cnt <= 0) { again: if ((read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; *ptr = *read_ptr++; return 1; } ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr; ptr = vptr; for (n = 1; n < maxlen; n++) { if ( (rc = my_read(fd, &c)) == 1) { *ptr++ = c; if (c == 'n') break; } else if (rc == 0) { *ptr = 0; return n - 1; } else return -1; } *ptr = 0; return n; }
#ifndef __WRAP_H_ #define __WRAP_H_ void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); int Bind(int fd, const struct sockaddr *sa, socklen_t salen); int Connect(int fd, const struct sockaddr *sa, socklen_t salen); int Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); int Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen); #endif
Makefile文件
all: s c
.PHONY: all
s: server.c helper.c
gcc server.c helper.c -o s -lpthread
c: client.c helper.c
gcc client.c helper.c -o c -lpthread
使用
make之后运行./s就好:
然后打开多个终端运行./c
注意:该服务器框架模拟的是高用户并发的情况,所以你在多开客户机连接的时候可能会出现某个连接没有响应的情况,这是因为线程池的自动扩容瘦身算法,这个算法需要基于不同的常见去定制才能达到比较好的效果,所以如果遇到上述情况,继续开客户端连接就好了。



