栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

使用 epoll 和 boost 库线程池实现并发服务器

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用 epoll 和 boost 库线程池实现并发服务器

1. epoll编程相关

1.2 epoll编程流程:

epoll编程流程

    int epoll_create(int size) 创建用于处理accept的专用文件描述符
 int epfd=epoll_create(10000);
    声明事件数组以及事件
 //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev,events[10000];
    监听(int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event))
 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

op: 对监听红黑树所做的操作
取值有:

EPOLL_CTL_ADD 向事件表中注册fd上的事件
EPOLL_CTL_MOD 修改fd上的注册事件
EPOLL_CTL_DEL 删除fd上的注册事件
    epoll_wait 监听
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epfd: epoll_create函数的返回值。

events: 他和ctl的epoll_event结构体不一样,它是一个数组(传出参数),传出满足监听事件的文件描述符

maxevents:数组元素的总个数

timeout:-1 阻塞, 0非阻塞,>0超时时间。

返回值:
>0:满足监听的总个数,可以用作循环上限。

2 修改服务器程序

global.h

#ifndef _GLOBAL_H
#define _GLOBAL_H

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
//以下新增
#include
#include
#include
#include
using namespace std;

#endif

server.h 添加设置非阻塞函数

#ifndef SERVER_H
#define SERVER_H

#include "global.h"

class server{
    private:
        int server_port;
        int server_sockfd;
        string server_ip;
        static vector sock_arr;
        static unordered_map name_sock_map;//名字和套接字描述符
        static unordered_map from_to_map;//记录用户xx要向用户yy发送信息
        static unordered_map > group_map;//记录群号和套接字描述符集合
        static pthread_mutex_t name_sock_mutx;//互斥锁,锁住需要修改name_sock_map的临界区
        static pthread_mutex_t group_mutx;//互斥锁,锁住需要修改group_map的临界区
        static pthread_mutex_t from_mutex;//互斥锁,锁住修改from_to_map的临界区
    public:
        server(int port,string ip);
        ~server();
        void run();
        static void RecvMsg(int epollfd,int conn);
        static void HandleRequest(int epollfd,int conn,string str,tuple &info);
        static void setnonblocking(int conn); //将套接字设为非阻塞
};
#endif

setnonblocking(int conn)

//将参数的文件描述符设为非阻塞
void server::setnonblocking(int sock)
{
    int opts;
    opts=fcntl(sock,F_GETFL);
    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");
        exit(1);
    }
    opts = opts|O_NONBLOCK;
    if(fcntl(sock,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");
        exit(1);
    }
}

sever.cpp

#include "server.h"

vector server::sock_arr(10000,false);
unordered_map server::name_sock_map;//名字和套接字描述符
unordered_map server::from_to_map;//记录用户xx要向用户yy发送信息
unordered_map > server::group_map;//记录群号和套接字描述符集合
pthread_mutex_t server::name_sock_mutx;//互斥锁,锁住需要修改name_sock_map的临界区
pthread_mutex_t server::group_mutx;//互斥锁,锁住需要修改group_map的临界区
pthread_mutex_t server::from_mutex;//自旋锁,锁住修改from_to_map的临界区

server::server(int port,string ip):server_port(port),server_ip(ip){
    pthread_mutex_init(&name_sock_mutx, NULL); //创建互斥锁
    pthread_mutex_init(&group_mutx, NULL); //创建互斥锁
    pthread_mutex_init(&from_mutex, NULL); //创建互斥锁
}

将run函数的流程进行修改;
先epoll进行监听,之后调用线程池中的线程进行接收请求;

//将参数的文件描述符设为非阻塞
void server::setnonblocking(int sock)  
{  
    int opts;  
    opts=fcntl(sock,F_GETFL);  
    if(opts<0)  
    {  
        perror("fcntl(sock,GETFL)");  
        exit(1);  
    }  
    opts = opts|O_NONBLOCK;  
    if(fcntl(sock,F_SETFL,opts)<0)  
    {  
        perror("fcntl(sock,SETFL,opts)");  
        exit(1);  
    }      
}  
void server::run(){
    //listen的backlog大小
    int LISTENQ=200;
    int i, maxi, listenfd, connfd, sockfd,epfd,nfds;
    ssize_t n;
    //char line[MAXLINE];
    socklen_t clilen;
    //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
    struct epoll_event ev,events[10000];
    //生成用于处理accept的epoll专用的文件描述符
    epfd=epoll_create(10000);
    struct sockaddr_in clientaddr;
    struct sockaddr_in serveraddr;
    listenfd = socket(PF_INET, SOCK_STREAM, 0);
    //把socket设置为非阻塞方式
    setnonblocking(listenfd);
    //设置与要处理的事件相关的文件描述符
    ev.data.fd=listenfd;
    //设置要处理的事件类型
    ev.events=EPOLLIN|EPOLLET;
    //注册epoll事件
    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
    //设置serveraddr
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr("127.0.0.1");//此处设为服务器的ip
    serveraddr.sin_port=htons(8023);
    bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    clilen=sizeof(clientaddr);
    maxi = 0;

    
    boost::asio::thread_pool tp(10);

    while(1){
        cout<<"--------------------------"< 

接收函数

void server::RecvMsg(int epollfd,int conn){
    tuple info;//元组类型,四个成员分别为if_login、login_name、target_name、target_conn
    
    get<0>(info)=false;
    get<3>(info)=-1;

    string recv_str;
    while(1){
        char buf[10];
        memset(buf, 0, sizeof(buf));
        int ret  = recv(conn, buf, sizeof(buf), 0);
        if(ret < 0){
            cout<<"recv返回值小于0"< 

HandleRequest 函数,由于我们采用的是epolloneshot所以之后我们在处理完请求后需要重新注册epoll事件:

void server::HandleRequest(int epollfd,int conn,string str,tuple &info){
    char buffer[1000];
    string name,pass;
    //把参数提出来,方便操作
    bool if_login=get<0>(info);//记录当前服务对象是否成功登录
    string login_name=get<1>(info);//记录当前服务对象的名字
    string target_name=get<2>(info);//记录目标对象的名字
    int target_conn=get<3>(info);//目标对象的套接字描述符
    int group_num=get<4>(info);//记录所处群号

    //连接MYSQL数据库
    MYSQL *con=mysql_init(NULL);
    mysql_real_connect(con,"127.0.0.1","root","","ChatProject",0,NULL,CLIENT_MULTI_STATEMENTS);

    //连接redis数据库
    redisContext *redis_target = redisConnect("127.0.0.1",6379);
    if(redis_target->err){
        redisFree(redis_target);
        cout<<"连接redis失败"<str){
            cout<<"查询redis结果:"<str<str;
        }
        //不存在
        else
            send_res="NULL";
        send(conn,send_res.c_str(),send_res.length()+1,0);
    }

    //注册
    else if(str.find("name:")!=str.npos){
        cout<<"注册方法n";
        int p1=str.find("name:"),p2=str.find("pass:");
        name=str.substr(p1+5,p2-5);
        pass=str.substr(p2+5,str.length()-p2-4);
        string search="INSERT INTO USER VALUES ("";
        search+=name;
        search+="","";
        search+=pass;
        search+="");";
        cout<<"sql语句:"<err)
        redisFree(redis_target);

    //更新实参
    get<0>(info)=if_login;//记录当前服务对象是否成功登录
    get<1>(info)=login_name;//记录当前服务对象的名字
    get<2>(info)=target_name;//记录目标对象的名字
    get<3>(info)=target_conn;//目标对象的套接字描述符
    get<4>(info)=group_num;//记录所处群号
}
3 资源

代码

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752424.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号