栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > C/C++/C#

基于epoll的多线程网络服务程序设计——C语言

C/C++/C# 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于epoll的多线程网络服务程序设计——C语言

基于epoll的多线程网络服务程序设计——C语言

​ 采用C语言设计了一个基于epoll的多线程网络服务程序。每个线程都有一个epoll来捕获处于这个线程的socket事件。当子线程数量为0,即只有一个线程,则网络监听服务与socket消息处理处于同一个epoll。当子线程数量大于0时,主线程监听socket连接,当有新的连接到来时将其加入到活跃socket数量最小的子线程的epoll中。

server.h

#ifndef EPOLL_C_SERVER_H
#define EPOLL_C_SERVER_H

#include 
#include 
#include 

#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define RESULT_OK 0
#define RESULT_ERROR -1


typedef int (*MSG_HANDLE)(int socket_fd,void* arg) ;  

typedef struct
{
    int epoll_fd;
    pthread_t thd_fd;
    
    //消息处理函数,各个线程会调用该函数进行消息处理
    MSG_HANDLE data_func;
	
    //一个线程里面的有效socket数量
    unsigned int active_conection_cnt; 
    //线程互斥锁,用于实时更新有效socket数量
    pthread_mutex_t thd_mutex;  
}socket_thd_struct;   //表示处理socket的子线程

typedef struct
{
    int epoll_fd;
    unsigned short ip_port;
    
    //消息处理函数,当只有一个线程时,会调用该函数进行消息处理
    MSG_HANDLE data_func;

    //子线程数量,可以为0,为0表示server与socket处理处于同一个线程
    unsigned int socket_pthread_count; 
    //子线程结构体指针
    socket_thd_struct* socket_thd;   

}server_struct;  //一个网络服务结构体


server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle);


int serverRun(server_struct *param_server);

#endif //EPOLL_C_SERVER_H

server.c

#include "server.h"

static void* socketPthreadRun(void* arg)
{
    socket_thd_struct* pa_sock_st=(socket_thd_struct*)arg;
    int active_counts=0;
    struct epoll_event ev;
    struct epoll_event events[5];
    int ret=0;

    while(1)
    {
        //等待读写事件的到来
        active_counts=epoll_wait(pa_sock_st->epoll_fd,events,5,-1);
        sprintf(stdout,"active count:%dn",active_counts);

        int index=0;
        for(index=0;indexepoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                close(events[index].data.fd);

                pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                pa_sock_st->active_conection_cnt--;
                pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
            }
            else if(events[index].events&EPOLLIN) //读事件到来,进行消息处理
            {
                sprintf(stdout,"handle recv client datan");
                ret=pa_sock_st->data_func(events[index].data.fd,NULL);
                if(ret==-1)
                {
                    sprintf(stderr,"client socket exception happenedn");
                    epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);

                    pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                    pa_sock_st->active_conection_cnt--;
                    pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
                }
                if(ret==0)
                {
                    sprintf(stdout,"client close this socketn");
                    epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);

                    pthread_mutex_lock(&(pa_sock_st->thd_mutex));
                    pa_sock_st->active_conection_cnt--;
                    pthread_mutex_unlock(&(pa_sock_st->thd_mutex));
                }
                else if(ret==1)
                {
                    //更新socket事件
                    ev.data.fd=events[index].data.fd;
                    ev.events=EPOLLIN|EPOLLET;
                    epoll_ctl(pa_sock_st->epoll_fd,EPOLL_CTL_MOD,events[index].data.fd,&ev);
                }
            }
        }
    }

    pthread_exit(NULL);
}

server_struct* initServerStruct(unsigned short param_port,unsigned int param_thd_count,MSG_HANDLE param_handle)
{
    server_struct* serv_st=(server_struct*)malloc(sizeof(server_struct));
    serv_st->ip_port=param_port;
    serv_st->data_func=param_handle;
    serv_st->epoll_fd=epoll_create(256);
    serv_st->socket_pthread_count=param_thd_count;
    serv_st->socket_thd=NULL;

    if(serv_st->socket_pthread_count>0)
    {
        sprintf(stdout,"create client socket sub threadn");
        serv_st->socket_thd=(socket_thd_struct*)malloc(sizeof(socket_thd_struct)*serv_st->socket_pthread_count);

        int index=0;
        for(index=0;indexsocket_pthread_count;index++)
        {
            serv_st->socket_thd[index].epoll_fd=epoll_create(256);
            serv_st->socket_thd[index].data_func=param_handle;
            serv_st->socket_thd[index].active_conection_cnt=0;
            serv_st->socket_thd[index].thd_fd=0;
            //创建子线程
            pthread_create(&(serv_st->socket_thd[index].thd_fd),NULL,socketPthreadRun,(void*)&(serv_st->socket_thd[index]));
            //初始化线程互斥锁
            pthread_mutex_init(&(serv_st->socket_thd[index].thd_mutex),NULL);
        }
    }

    return serv_st;
}

int serverRun(server_struct *param_server)
{
    int ret=RESULT_OK;
    int server_socket=0;
    struct sockaddr_in server_addr;
    bzero(&server_addr,sizeof(server_addr));
    struct epoll_event ev;
    struct epoll_event events[5];
    int active_count=0;
    int index=0;
    int new_socket=0;
    struct sockaddr_in client_info;
    socklen_t client_info_len=0;

    server_addr.sin_family=AF_INET;
    server_addr.sin_addr.s_addr=htons(INADDR_ANY);
    server_addr.sin_port=htons(param_server->ip_port);

    server_socket=socket(PF_INET,SOCK_STREAM,0);
    if(server_socket<0)
    {
        sprintf(stderr,"create socket errorn");
        return RESULT_ERROR;
    }
    printf("create server socket ssuccessfuln");

    param_server->epoll_fd=epoll_create(256);

    ev.data.fd=server_socket;
    ev.events=EPOLLIN|EPOLLET;
    if(epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,server_socket,&ev)!=0)
    {
        sprintf(stderr,"server socket add to epoll errorn");
        return RESULT_ERROR;
    }
    sprintf(stdout,"server socket add to epoll successfuln");

    if(bind(server_socket,(struct sockaddr*)&server_addr,sizeof(server_addr))!=0)
    {
        sprintf(stderr,"server bind failed:%dn",param_server->ip_port);
        return RESULT_ERROR;
    }
    sprintf(stdout,"server socket bind successfuln");

    if(listen(server_socket,param_server->ip_port)!=0)
    {
        sprintf(stderr,"server listen failed:%dn",param_server->ip_port);
        return RESULT_ERROR;
    }
    sprintf(stdout,"server socket listen successfuln");

    while(1)
    {
        active_count=epoll_wait(param_server->epoll_fd,events,5,-1);
        sprintf(stdout,"active count:%dn",active_count);

        for(index=0;indexconnectedn",((unsigned char*)&(client_info.sin_addr))[0],((unsigned char*)&(client_info.sin_addr))[1],((unsigned char*)&(client_info.sin_addr))[2],((unsigned char*)&(client_info.sin_addr))[3],client_info.sin_port);

                ev.data.fd=new_socket;
                ev.events=EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP;

                if(param_server->socket_pthread_count==0)
                {
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);
                }
                else
                {
                    int tmp_index=0;
                    int mix_cnt_thread_id=0;
                    unsigned int act_cnts=0;
                    for(tmp_index=0;tmp_indexsocket_pthread_count;tmp_index++)
                    {
                        pthread_mutex_lock(&(param_server->socket_thd[tmp_index].thd_mutex));
                        act_cnts=param_server->socket_thd[tmp_index].active_conection_cnt;
                        pthread_mutex_unlock(&(param_server->socket_thd[tmp_index].thd_mutex));
                        if(mix_cnt_thread_id>act_cnts)
                        {
                            mix_cnt_thread_id=tmp_index;
                        }
                    }

                    epoll_ctl(param_server->socket_thd[mix_cnt_thread_id].epoll_fd,EPOLL_CTL_ADD,new_socket,&ev);

                    pthread_mutex_lock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
                    param_server->socket_thd[mix_cnt_thread_id].active_conection_cnt++;
                    pthread_mutex_unlock(&(param_server->socket_thd[mix_cnt_thread_id].thd_mutex));
                }

                sprintf(stdout,"add new client socket to epolln");
            }
            else if(events[index].events&EPOLLERR || events[index].events&EPOLLRDHUP) //对端关闭连接
            {
                sprintf(stdout,"client close this socketn");
                epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                close(events[index].data.fd);
            }
            else if(events[index].events&EPOLLIN) //读事件到来,进行消息处理
            {
                sprintf(stdout,"begin recv client datan");
                ret=param_server->data_func(events[index].data.fd,NULL);
                if(ret==-1)
                {
                    sprintf(stderr,"client socket exception happenedn");
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);
                }
                if(ret==0)
                {
                    sprintf(stdout,"client close this socketn");
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_DEL,events[index].data.fd,NULL);
                    close(events[index].data.fd);
                }
                else if(ret==1)
                {
                    ev.data.fd=events[index].data.fd;
                    ev.events=EPOLLIN|EPOLLET;
                    epoll_ctl(param_server->epoll_fd,EPOLL_CTL_MOD,events[index].data.fd,&ev);
                }
            }
        }
    }

    close(server_socket);
    return RESULT_OK;
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/352368.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号