- 前引
- Linux Kernel Select/Poll/Epoll源码深度历险(一)---- 探索Select底层运行原理
- 1、先得到Linux Kernel Source Code
- 2、开始历险 顺藤摸瓜探索Select
- 1、初得用户态 Select调用代码表层
- 2、 欲求无果用户态调用 直接转向内核调用Select入口
- 3、 仔细查看core_sys_select select核心实现
- 1、先从fd_set结构体入手
- 2、看看fd_set 相关的宏定义
- 3、转回core_sys_select 开始分析(上半部分)
- 4、转回core_sys_select 开始分析(下半部分)
- 4、原来do_select才是真正实现功能的大魔王
- 1、仔细分析do_select 核心代码(上半部分)
- 1、仔细分析do_select 核心代码(下半部分)
- 3、最后浅浅总结一下
前引
鉴于之前我自己看LevelDB源码中间看了半天未果 而且自己也看不动了
这段时间自己又在准备 在刷大量面经 看到面试题里面经常会问Select/Poll/Epoll的实现原理和区别
我一直认为 源码之前 了无秘密
在没有看到源码和自己去深度探索过一些东西的运行流程的时候 所有的东西都只不过是粗略了解概念罢了
秉着所有东西好好准备的原则 以及 赎罪我之前看LevelDB源码没有看完的那段时间 我们的Select/Poll/Epoll就以源码剖析的角度来看看 究竟是内核是怎么支持实现的IO复用
这几篇我是无论如何都要写完的 不会就中途弃稿删除的
那我们下面就一步步来吧
Linux Kernel Select/Poll/Epoll源码深度历险(一)---- 探索Select底层运行原理
1、先得到Linux Kernel Source Code
首先就是我们需要得到Linux Kernel Soucr Code 没有源码我们怎么来看底层实现呢
有的人就要说了 啊 你不是之前安了虚拟机的吗 直接看你安装的Ubuntu找找源码就可以了吗 对的 刚开始我也是这样想的
但是之后我认真的搜索了一下 发现找半天都没有找到Select.c 搜索了半天也只找到了其头文件select.h
也罢 我们直接去下载一个内核源码就好了 下面给出两个连接 一个是目前让仍然保持迭代 Linux之父以及Linux官方所维持的最新版本链接
第二个链接就是我们之后要用的 较早版本的Linux Kernel 2.6.28版本的
因为在Linux kernel 2.6版本以上 内核就支持了Epoll 我所写的High-Performance-WebServer也是必须要求内核版本在2.6以上才能支持(主要是由于IO复用函数我默认就选择的是Epoll 内核必须支持Epoll)
第二个链接直接就是下载链接 直接下载即可
torvalds linux kernel soucr code Github链接
linux-2.6.28.6.tar.bz2 下载链接
在下载好之后 我们在linux上面解压 最后就得到了我们的源码了
那我们的源码历险就要开始了
2、开始历险 顺藤摸瓜探索Select
1、初得用户态 Select调用代码表层
这里其实有点头疼的就是 我没找到Select的用户层代码
这是最让人头疼的 但其实man中也告诉了我们用户层select代码用法 这也算天无绝人之路了
看看man select
定义的文件我找了半天 终于找到了 应该用户态调用的文件是在GNU C Library里面
这个这是定义文件
函数实现的部分我还是想再找找看 至少让我看下用户态到内核态的那个调用语句嘛
找了半天没找到 算了 我们还是直接去看内核调用SELECT的入口就好了
2、 欲求无果用户态调用 直接转向内核调用Select入口
这个时候 我们直接来到内核调用SELECT入口就好了
毕竟用户态的函数没找到 反正是需要转到这里来的
文件是在fs/select.c中
搜索了一下select得到了 我们把它拿出来看看
代码如下 我们初步看一看
SYSCALL_DEFINE5这个5是五个参数的意思 目前这个代码是在内核态运行的 也应该是由int 0x80软中断进入的
首先先判断了 是否有tvp指针 这里的tvp我们判断一下就是用户态的第五个参数struct timeval*参数 看看是否放入了 如果设置为NULL就直接跳过了 没设置的话则会调用copy_from_user 这个函数调用不用去看 都知道是讲用户态的这个结构体 拷贝到目前的tv这个目前内核太态的局部变量里面
下面这个就是设置 超时时间timeout 我们的主要目的不是来看这个的 这个就暂时略过 反正源函数也在这个相同的文件中
到下面了 这个两个函数应该是主要部分 先调用了core_sys_select 我猜测这个应该是select的核心实现函数 最后的poll_select_copy_remaining是和我们的第五个参数超时有关的 那我们就直接来到core_sys_select
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
ret = core_sys_select(n, inp, outp, exp, to);
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}
3、 仔细查看core_sys_select select核心实现
直到刚刚 我才基本上理清楚了Select的系统调用是怎么样的
当然 过程中我打开了相当多的文件 有些函数的话 我们看名称也就明白了什么意思 刚刚觉得 如果我没有理清楚代码就来写的话 则是一种不负责任的表现 从头开始理清了 这样子自己写博客的时候 也知道该怎么下手 该怎么写才合理 上面其实也都算是小打小闹 那我们下面也算是正式开始了
1、先从fd_set结构体入手
既然要搞懂Select系统调用 至少要把最关键的FD_SET 也就是我们传入的参数结构给搞懂吧 这也是我找了半天才找到的文件
那我们下面走起
linux/types.h中的文件 找了半天结果发现这里只是改了一下名字 fd_set在内部定义是__kerenl_fd_set 那我们再找找__kerenl_fd_set
我们根据上面包含的头文件 找到了linux/posix_types.h
fd_set的庐山真面目出来了
这里我们就好好的来分析出来东西了
如果面试题里面有 为什么Select的文件描述符默认最大只有1024的话 这里我们根据源码就可以做出回答了
但是我们不急着来回答 先来分析一下吧
这里 其实fd_set本质就是以unsigned long的数组 以数据结构的角度来看 其实也就是位图 这个位图我们根据上面的宏定义来看 其实也就发现 这里的1024 其实在位图中 也就是1024位而已 换算为字节数 不过也就是128 再换算为32个Unsigned long
为什么只有1024 :由于Linux内核宏定义设置FD_SET位图最多1024位 但可以修改宏定义使其突破监控文件描述符最大只有1024
ok 在看完FD_SET后 也许有的人也很好奇 FD_CLR FD_SET FD_ZERO 这里我也去找了找相应的头文件
2、看看fd_set 相关的宏定义
由于我在我的Ubuntu里面找到了 所以就看看我虚拟机里面的关于这个定义 肯定是一样的
x86_64-linux-gnu/sys/select.h
x86_64-linux-gnu/bits/select.h
代码很简单 其实也就是位图操作而已 没什么大不了的 很容易就看懂了
为了方便看代码 我这里就再贴一份 就不多写了 很简单
# define __FD_ZERO(set)
do {
unsigned int __i;
fd_set *__arr = (set);
for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)
__FDS_BITS (__arr)[__i] = 0;
} while (0)
#define __FD_SET(d, set)
((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
#define __FD_CLR(d, set)
((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))
#define __FD_ISSET(d, set)
((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)
3、转回core_sys_select 开始分析(上半部分)
上面没有贴出来这个函数的源代码 这里就贴一下
下面我们就开始一部分一部分开始分析了
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
ret = -ENOMEM;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time);
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret;
}
下面截取了上半部分
首先说说上面的变量吧 fd_set_bits 是位图的结合体 因为我们传入了3个位图(fd_set) 它在遍历的时候 也需要三个位图来记录哪些是有相应的 所以这个结构体中 就有6个变量指针 分别是3个我们的inp outp exp 还有3个是我们之后要返回的rinp routp rexp 结构如下
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;
然后就是bits 我们的位图存储需要从用户态存回内核态 我们这里为了高效一点 就采用的开栈的方式stack_fds 来存储我们的位图(包括复制用户态的位图 和 我们需要返回的位图) 后面就可以看到我们的位图存储了
struct fdtable的话 就是我们对于文件描述符相关的结构体了 我们这里不用了解其他的 只需要直到可以得到目前进程打开的最大文件描述符是多少即可
上面基本上就介绍完了 到后面的rcu_read_lock 这个函数我去看了看 在这里的意义 应该就是给我们的进程拍了个快照 我们得到了目前进程最大打开的文件描述符 作用就是与我们传进来的监视最大文件描述符做比较而已
那我们对于这部分代码也就介绍完了
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
......
4、转回core_sys_select 开始分析(下半部分)
那我们书接上回 继续来分析
这里的话 其实就是在为我们的位图存储做工作了
一是检测我们上面的开栈空间是否足够 不足的话 就要调用内核的kmalloc来分配空间 二是讲之前的6个位图指针指向分配好的位置 三是复制我们的用户态的位图 内存从用户态分配到内核态 get_fd_set就是在做这个工作 成功了返回0 把后面的3个我们之后要返回 也就是要用的存储位图清零后 就要进入真正的核心函数do_select了
其实 do_select才是真正的核心工作函数 这里只不过是在为后面做准备工作而已 我们可以提前看一下后面的函数 do_select返回后 我们又调用了set_fd_set 其实也就是把我们的 准备ret的位图 又重新复制回了我们输入参数的用户态所在内存而已
最后再返回 我们目前有多少文件描述符引起响应的个数 结束了调用
那我们最后来看看do_select Select函数的深度历险 我们基本上也就走完了 走着
.........
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
ret = -ENOMEM;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time);
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret;
}
上面代码用到的函数的补充
static inline
int get_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
{
nr = FDS_BYTES(nr);
if (ufdset)
return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0;
memset(fdset, 0, nr);
return 0;
}
static inline unsigned long __must_check
set_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
{
if (ufdset)
return __copy_to_user(ufdset, fdset, FDS_BYTES(nr));
return 0;
}
4、原来do_select才是真正实现功能的大魔王
1、仔细分析do_select 核心代码(上半部分)
还是老样子 这里先把代码全列出来了 后面一部分一部分来
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
unsigned long slack = 0;
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = estimate_accuracy(end_time);
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
set_current_state(TASK_INTERRUPTIBLE);
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS;
continue;
}
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed);
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll)
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput_light(file, fput_needed);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait = NULL;
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
}
__set_current_state(TASK_RUNNING);
poll_freewait(&table);
return retval;
}
先看一下下面的变量 poll_wqueues 我看了看 这个是Poll才用得到的 我们这里就先不说了 Poll_table也是 对于max_select_fd 其实就是对位图操作 找到我们里面最大的fd 我们之后的循环 也就是按照最大需要遍历的文件描述符 从0开始遍历
至于endtime 这个是我们需要填超时时间才用得到的 这里不是关键 我们主要是想看关键循环 这里就略过把
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
unsigned long slack = 0;
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = estimate_accuracy(end_time);
.......
1、仔细分析do_select 核心代码(下半部分)
精彩的来了嗷
这里其实我觉得分析起来的难度不大 很容易就看懂了
首先呢 我们把返回的数置零 然后进入循环for(; ; )
把自己的状态设为睡眠 但可以被唤醒 这里我猜测唤醒条件 应该就是有信号 有文件描述符准备好了 给这个线程信号 然后唤醒 接着就可以开始下面的工作了 我猜测是这样的
然后就给局部变量赋值
到了正式的遍历循环了 这里的n也就是之前所说的 需要遍历的最大文件描述符 后面的条件则是++我们的要返回的位图地址 因为我们一次循环 内循环是对位图的一个unsigned long字节所表示的文件描述符这么多来循环 那么说明 其实最多我们也就外循环32次 上面我们已经算出来了
进入循环 我们对于我们的三个寻址数列read write err求一个| 从而得到一个unsigned long我们需要检查的文件描述符 之后对于 某些并集都没有1的位 我们就直接跳过 如果发现三个寻址数列 对于目前的一个unsigned long 并集都还是全部为0 直接跳到下一格unsigned long
进入内循环 其实核心就是从1开始遍历一个unsigned long中的所有位 一个在得到file的状态后 我们调用系统调用poll 这个系统调用我看了看 返回了unsigned long 其实也就是mask mask里面就记录了 POLLING/POLLOUT/POLLERR这些位状态 我们对这些位状态进行取交集 就可以得知文件描述符状态了
基本上 上面就是最核心的分析部分了 后面基本上都是dirty work了 像什么设置位图 恢复进程状态 讲这三个位图复制到用户态 就是这些后续处理工作了
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
set_current_state(TASK_INTERRUPTIBLE);
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS;
continue;
}
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed);
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll)
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput_light(file, fput_needed);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait = NULL;
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
}
__set_current_state(TASK_RUNNING);
poll_freewait(&table);
return retval;
}
3、最后浅浅总结一下
这里就简单的总结一下吧
select->int 0x80(SYSCALL_DEFINE5) -> core_sys_select -> do_select
运行流程就是这样子的
通过位图的控制与最大监视文件描述符 我们对每一个放入描述符都进行遍历扫描 看看是否存在读/写/异常状态 在内核态临时分配的栈空间充当临时位图 从而记录结果 最后再遍历一遍描述符 把结果从内核态再复制到用户态所传入的参数内存处 返回值为有多少描述符是有活跃活动的 最后结束调用
整个过程 个人认为是特别缓慢的 首先就是O(n) 传入的最大描述符数字 我们就真的需要遍历一遍这么多个数 尽管其中很多描述符我们可以跳过 而且对于三种不同的状态 我们都分别要开位图来记录…
总结的很浅 但分析过程已经很浅显易懂的写在上面了 这篇就写到这里吧
本来今天的目标是 打算写完Select/Poll/Epoll的调用的 写三篇博客来分别分析 结果现在晚上7点了 吃个饭回来看看Poll了



