本文分析的state-threads的版本是1.9。
srs源码分析1-搭建环境
srs源码分析2-浅析state_threads
srs源码分析3-srs的启动
srs源码分析4-客户端的连接
srs源码分析5-handshake
srs源码分析6-connect
以下正在写作中。。。
srs源码分析7-create stream
srs源码分析8-推流-publish
srs源码分析9-推流-unpublish
srs源码分析10-拉流-play
srs源码分析11-拉流-pause
srs源码分析12-转发-forward
srs是基于协程开发的,底层使用了state_threads协程库。为了更好的理解srs,所以需要先熟悉state_threads。这里并不会介绍协程的相关概念,只是简单的介绍一下state_threads的核心逻辑。
以下state_thread会被简称为st。
使用示例-echo server使用st实现了一个简单的echo服务器,以下代码写的很简单,重点是理解st的使用。
#include #include#include #include #include #include #include #include #define LISTEN_PORT 9000 #define ERR_EXIT(m) do { perror(m); exit(-1); } while (0) void *client_thread(void *arg) { st_netfd_t client_st_fd = (st_netfd_t)arg; int client_fd = st_netfd_fileno(client_st_fd); sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len); if (ret == -1) { printf("[WARN] Failed to get client ip: %sn", strerror(ret)); } char ip_buf[INET_ADDRSTRLEN]; bzero(ip_buf, sizeof(ip_buf)); inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf, sizeof(ip_buf)); while (1) { char buf[1024] = {0}; ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_read errorn"); break; } else if (ret == 0) { printf("client quit, ip = %sn", ip_buf); break; } printf("recv from %s, data = %s", ip_buf, buf); ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_write errorn"); } } } void *listen_thread(void *arg) { while (1) { st_netfd_t client_st_fd = st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT); if (client_st_fd == NULL) { continue; } printf("get a new client, fd = %dn", st_netfd_fileno(client_st_fd)); st_thread_t client_tid = st_thread_create(client_thread, (void *)client_st_fd, 0, 0); if (client_tid == NULL) { printf("Failed to st create client threadn"); } } } int main() { int ret = st_set_eventsys(ST_EVENTSYS_ALT); if (ret == -1) { printf("st_set_eventsys use linux epoll failedn"); } ret = st_init(); if (ret != 0) { printf("st_init failed. ret = %dn", ret); return -1; } int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { ERR_EXIT("socket"); } int reuse_socket = 1; ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)); if (ret == -1) { ERR_EXIT("setsockopt"); } struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(LISTEN_PORT); server_addr.sin_addr.s_addr = INADDR_ANY; ret = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)); if (ret == -1) { ERR_EXIT("bind"); } ret = listen(listen_fd, 128); if (ret == -1) { ERR_EXIT("listen"); } st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd); if (!st_listen_fd) { printf("st_netfd_open_socket open socket failed.n"); return -1; } st_thread_t listen_tid = st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0); if (listen_tid == NULL) { printf("Failed to st create listen threadn"); } while (1) { st_sleep(1); } return 0; }
root@learner:~/tmp/st# gcc main.cpp -lst root@learner-Lenovo:~/tmp/st# ./a.out get a new client, fd = 4 recv from 192.168.30.17, data = hello world client quit, ip = 192.168.30.17 ^C
root@learner:~# nc 192.168.30.17 9000 hello world hello world ^C
创建一个listen协程,用于监听客户端的连接,当客户端连接服务后,会为此客户端创建一个client协程,用于处理此客户端的所有请求。
协程的切换st中协程的切换提供了两种方式:一种是使用系统提供的setjmp和longjmp接口,另一种是使用汇编实现的_st_md_cxt_save和_st_md_cxt_restore接口,这两个函数从用法上同setjmp和longjmp。
这两种方式的切换本质上都是栈帧的切换。
setjmp和longjmpC语言中的goto语句只能在当前函数内跳转,而不能在函数间跳转。setjmp()和longjmp()可以执行非局部跳转,即跳转的目标为当前执行函数之外的某个位置。
setjmp()函数为后续由longjmp()调用执行的跳转确立了跳转目标,该目标正是程序发起setjmp()调用的位置。从编程角度看来,调用longjmp()函数后,看起来就和从第二次调用setjmp()返回时完全一样。通过setjmp()的返回值,可以区分setjmp()调用是初始返回还是第二次返回。初始调用返回值为0,后续“伪返回”的返回值为longjmp()调用中val参数所指定的任意值。通过对val参数使用不同值,能够区分程序中跳转至同一目标的不同起跳位置。更多相关setjmp()、longjmp()的介绍,可以参考《Linux/UNIX系统编程手册》上册第106页。
以下是从《Linux/UNIX系统编程手册》摘抄的示例:
#include#include #include jmp_buf env; void f2(int num) { longjmp(env, num); } void f1(int num) { if(num == 1){ longjmp(env, num); } f2(num); } int main(int argc, char** argv) { if(argc != 2){ printf("Usage: %s [1|2]n", argv[0]); return -1; } switch(setjmp(env)){ case 0: printf("Calling f1() after initial setjmp()n"); f1(atoi(argv[1])); break; case 1: printf("We jumped back from f1()n"); break; case 2: printf("We jumped back from f2()n"); break; } return 0; }
这个示例我稍微做了一些修改,运行结果及分析如下:
root@learner:~/tmp# ./a.out 1 Calling f1() after initial setjmp() We jumped back from f1()
root@learner:~/tmp# ./a.out 2 Calling f1() after initial setjmp() We jumped back from f2()_st_md_cxt_save和_st_md_cxt_restore
这两个函数是通过汇编实现的,代码如下:
#define JB_BX 0
#define JB_SI 1
#define JB_DI 2
#define JB_BP 3
#define JB_SP 4
#define JB_PC 5
.file "md.S"
.text
.globl _st_md_cxt_save
.type _st_md_cxt_save, @function
.align 16
_st_md_cxt_save:
movl 4(%esp), %eax
movl %ebx, (JB_BX*4)(%eax)
movl %esi, (JB_SI*4)(%eax)
movl %edi, (JB_DI*4)(%eax)
leal 4(%esp), %ecx /
movl %ecx, (JB_SP*4)(%eax)
movl 0(%esp), %ecx
movl %ecx, (JB_PC*4)(%eax)
movl %ebp, (JB_BP*4)(%eax)
xorl %eax, %eax
ret
.size _st_md_cxt_save, .-_st_md_cxt_save
.globl _st_md_cxt_restore
.type _st_md_cxt_restore, @function
.align 16
_st_md_cxt_restore:
movl 4(%esp), %ecx
movl 8(%esp), %eax
movl (JB_PC*4)(%ecx), %edx
movl (JB_BX*4)(%ecx), %ebx
movl (JB_SI*4)(%ecx), %esi
movl (JB_DI*4)(%ecx), %edi
movl (JB_BP*4)(%ecx), %ebp
movl (JB_SP*4)(%ecx), %esp
testl %eax, %eax
jnz 1f
incl %eax
1: jmp *%edx
.size _st_md_cxt_restore, .-_st_md_cxt_restore
_st_md_cxt_save(__jmp_buf env)用于保存栈帧,_st_md_cxt_restore(__jmp_buf env, int val)用于恢复栈帧。
st中协程的切换宏#if defined(MD_USE_BUILTIN_SETJMP) && !defined(USE_LIBC_SETJMP)
#define MD_SETJMP(env) _st_md_cxt_save(env)
#define MD_LONGJMP(env, val) _st_md_cxt_restore(env, val)
extern int _st_md_cxt_save(jmp_buf env);
extern void _st_md_cxt_restore(jmp_buf env, int val);
#else
#define MD_SETJMP(env) setjmp(env)
#define MD_LONGJMP(env, val) longjmp(env, val)
#endif
如果定义了MD_USE_BUILTIN_SETJMP宏,且没有定义USE_LIBC_SETJMP宏,则使用自定义的栈帧存取函数。否则使用系统提供的setjmp和longjmp切换栈帧。
#define _ST_SWITCH_CONTEXT(_thread)
ST_BEGIN_MACRO
ST_SWITCH_OUT_CB(_thread);
if (!MD_SETJMP((_thread)->context)) {
_st_vp_schedule();
}
ST_DEBUG_ITERATE_THREADS();
ST_SWITCH_IN_CB(_thread);
ST_END_MACRO
_ST_SWITCH_CONTEXT用于将协程的CPU执行权让出去,重新调度一个新的协程。
当协程调用_ST_SWITCH_CONTEXT时,此时MD_SETJMP会返回0,则进入协程调度函数_st_vp_schedule(),CPU的执行权转移到其他协程。此时相当于在本协程中打上了一个切换点。当本协程将再次获得CPU执行权时,在_st_vp_schedule()中调用_ST_RESTORE_CONTEXT宏函数,会通过MD_SETJMP再次返回,此时返回值为1,跳过if语句返回到本协程调用_ST_SWITCH_CONTEXT的位置,继续往下执行。
#define _ST_RESTORE_CONTEXT(_thread)
ST_BEGIN_MACRO
_ST_SET_CURRENT_THREAD(_thread);
MD_LONGJMP((_thread)->context, 1);
ST_END_MACRO
_ST_RESTORE_CONTEXT用于恢复指定的协程,通过MD_LONGJMP宏,返回到MD_SETJMP打的断点处,从MD_SETJMP再次返回,从而再次获取到CPU的执行权。
void _st_vp_schedule(void)
{
_st_thread_t *thread;
if (_ST_RUNQ.next != &_ST_RUNQ) {
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
} else {
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
在切换协程时,会从就绪的协程队列中取出一个协程,然后切换至该协程。如果就绪队列中没有可切换的协程,则说明没有协程需要处理,此时会切换至idle协程。返回idle协程后,会重新进入epoll_wait,重新开始监听待发生的事件和处理定时事件。
调度器所有的协程都是在一个单线程中执行的,所以需要有一个调度器来调度所有的协程,以便需要执行权限的协程能够获取到CPU。通常协程在发生读事件、写事件、定时器事件时才需要执行权限,也就是发生这些事件后,需要将协程调度到CPU上,让其获得CPU的执行权,处理对应的事情。
st中对读写事件的监控是通过epoll实现的,而定时器事件通过最小堆配合epoll的超时实现的。
typedef struct _st_eventsys_ops {
const char *name;
int val;
int (*init)(void);
void (*dispatch)(void);
int (*pollset_add)(struct pollfd *, int);
void (*pollset_del)(struct pollfd *, int);
int (*fd_new)(int);
int (*fd_close)(int);
int (*fd_getlimit)(void);
} _st_eventsys_t;
这是调度器的接口,可以使用epoll实现,也可以使用select和poll实现。
static _st_eventsys_t _st_epoll_eventsys = {
"epoll",
ST_EVENTSYS_ALT,
_st_epoll_init,
_st_epoll_dispatch,
_st_epoll_pollset_add,
_st_epoll_pollset_del,
_st_epoll_fd_new,
_st_epoll_fd_close,
_st_epoll_fd_getlimit
};
st中通过epoll实现了调度器,实现的这些函数作为回调函数封装到了结构体中。
ST_HIDDEN void _st_epoll_dispatch(void)
{
...
if (_ST_SLEEPQ == NULL) {
timeout = -1;
} else {
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
timeout = (int) (min_timeout / 1000);
...
}
nfd = epoll_wait(..., ..., ..., timeout);
...
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
...
}
在进入epoll_wait之前,先从最小堆中获取最近一个定时器触发的时间,将此时间作为epoll_wait的超时时间,如果在这个超时时间之内发生了读写事件,则epoll_wait返回处理读写事件;如果段超时时间之内没有发生读写事件,epoll_wait会因为超时而退出,此时返回正好处理定时事件。
若不是因为超时而从epoll_wait返回,说明有的协程读写事件触发了,此时需要将触发事件的协程保存到可运行队列中,等待新一轮的调度。
创建协程_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
_st_thread_t *thread;
_st_stack_t *stack;
void **ptds;
char *sp;
if (stk_size == 0)
stk_size = ST_DEFAULT_STACK_SIZE;
stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
stack = _st_stack_new(stk_size);
if (!stack)
return NULL;
sp = stack->stk_top;
sp = sp - (ST_KEYS_MAX * sizeof(void *));
ptds = (void **) sp;
sp = sp - sizeof(_st_thread_t);
thread = (_st_thread_t *) sp;
if ((unsigned long)sp & 0x3f)
sp = sp - ((unsigned long)sp & 0x3f);
stack->sp = sp - _ST_STACK_PAD_SIZE;
memset(thread, 0, sizeof(_st_thread_t));
memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
thread->private_data = ptds;
thread->stack = stack;
thread->start = start;
thread->arg = arg;
_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
if (joinable) {
thread->term = st_cond_new();
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}
thread->state = _ST_ST_RUNNABLE;
_st_active_count++;
_ST_ADD_RUNQ(thread);
return thread;
}
创建一个新的协程,在创建的过程中,会将这个协程放到可运行队列,等待着调度。在调度到这个新的协程时,就可以获得CPU的执行权。
除了主协程外,其他协程的栈都是在堆上申请的空间,默认大小时128KB。
#define _ST_INIT_CONTEXT MD_INIT_CONTEXT #define MD_INIT_CONTEXT(_thread, _sp, _main) ST_BEGIN_MACRO if (MD_SETJMP((_thread)->context)) _main(); MD_GET_SP(_thread) = (long) (_sp); ST_END_MACRO
在创建新协程时,会通过上面的宏函数设置还原点,当执行到MD_SETJMP时,会返回0,此时_main()函数不会被执行。当协程再次获取执行权时,会再次从MD_SETJMP返回,此时返回值为1,则进入_main()函数,也就是_st_thread_main()函数。
void _st_thread_main(void)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();
MD_CAP_STACK(&thread);
thread->retval = (*thread->start)(thread->arg);
st_thread_exit(thread->retval);
}
新的协程创建后,并不会立即被执行,需要先打上还原点,然后放入可执行队列中。当调度器调度到这个新线程后才会真正获取到CPU的执行权,在MD_SETJMP返回后,进入这个函数,在此函数中才会进入协程的入口函数。协程入口函数处理完毕后,会进入协程退出函数,这个稍后分析。
st的初始化int st_init(void)
{
_st_thread_t *thread;
if (_st_active_count) {
return 0;
}
st_set_eventsys(ST_EVENTSYS_DEFAULT);
if (_st_io_init() < 0)
return -1;
memset(&_st_this_vp, 0, sizeof(_st_vp_t));
ST_INIT_CLIST(&_ST_RUNQ);
ST_INIT_CLIST(&_ST_IOQ);
ST_INIT_CLIST(&_ST_ZOMBIEQ);
if ((*_st_eventsys->init)() < 0)
return -1;
_st_this_vp.pagesize = getpagesize();
_st_this_vp.last_clock = st_utime();
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
_st_active_count--;
_ST_DEL_RUNQ(_st_this_vp.idle_thread);
thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *)));
if (!thread)
return -1;
thread->private_data = (void **) (thread + 1);
thread->state = _ST_ST_RUNNING;
thread->flags = _ST_FL_PRIMORDIAL;
_ST_SET_CURRENT_THREAD(thread);
_st_active_count++;
return 0;
}
在使用st时,首先需要调用st_init()函数对st进行初始化。这个函数有三个作用:1、做一些初始化工作 2、创建idle协程 3、将主线程封装为主协程
主线程也是一条可执行流,需要将主线程封装成主协程,以便能够在调度器中进行调度。
idle协程是非常核心的,当就绪队列中没有可运行的协程时,会将CPU的执行权限调度到idle协程。在idle协程中重新开始监听读、写、定时器事件。
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
_ST_VP_IDLE();
_st_vp_check_clock();
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}
exit(0);
return NULL;
}
当就绪队列为空时,调度会进入idle线程,在idle线程中,会进入epoll_wait监听读写事件,有读写事件触发时,会将协程保存到就绪队列中;从epoll_wait返回后,查看是否有定时器触发,若有定时器触发,则将协程保存到就绪队列中。处理完读写事件和定时器事件后,idle协程让出CPU执行权,开始依次调度所有的就绪协程,所有的就绪协程处理完毕后,会再次进入idle协程,之后都是这样循环往复。
#define _ST_VP_IDLE() (*_st_eventsys->dispatch)()
_st_eventsys->dispatch是回调函数,这个函数指针实际指向_st_epoll_dispatch。
void _st_vp_check_clock(void)
{
_st_thread_t *thread;
st_utime_t now;
now = st_utime();
_ST_LAST_CLOCK = now;
if (_st_curr_time && now - _st_last_tset > 999000) {
_st_curr_time = time(NULL);
_st_last_tset = now;
}
while (_ST_SLEEPQ != NULL) {
thread = _ST_SLEEPQ;
ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);
if (thread->due > now)
break;
_ST_DEL_SLEEPQ(thread);
if (thread->state == _ST_ST_COND_WAIT)
thread->flags |= _ST_FL_TIMEDOUT;
ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));
thread->state = _ST_ST_RUNNABLE;
_ST_INSERT_RUNQ(thread);
}
}
从epoll_wait返回后,检查睡眠队列中的协程,当其定时器到了,则将协程送至就绪队列,等待新一轮的调度。
所有的定时器都放在最小堆中,从最小堆中获取到的是所有定时器的最小值。如果当前时间超过了最小堆中的定时器,说明定时器触发了。通过while循环将最小堆中的所有该触发的定时器全部都保存到就绪队列中。
协程的exit、join和yield_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
...
if (joinable) {
thread->term = st_cond_new();
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}
...
}
在创建协程的时候,需要指明是否会主动回收协程。如果需要主动回收协程,则需要为协程创建一个条件变量,以便其他协程阻塞的回收该协程。
void _st_thread_main(void)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();
MD_CAP_STACK(&thread);
thread->retval = (*thread->start)(thread->arg);
st_thread_exit(thread->retval);
}
当协程的主体函数执行完毕后,会进入st_thread_exit函数,用于退出协程。
void st_thread_exit(void *retval)
{
_st_thread_t *thread = _ST_CURRENT_THREAD();
thread->retval = retval;
_st_thread_cleanup(thread);
_st_active_count--;
if (thread->term) {
thread->state = _ST_ST_ZOMBIE;
_ST_ADD_ZOMBIEQ(thread);
st_cond_signal(thread->term);
_ST_SWITCH_CONTEXT(thread);
st_cond_destroy(thread->term);
thread->term = NULL;
}
if (!(thread->flags & _ST_FL_PRIMORDIAL))
_st_stack_free(thread->stack);
_ST_SWITCH_CONTEXT(thread);
}
若协程是主协程,则无需释放堆空间,否则需要释放在堆上申请的用于栈的空间。thread->term不为NULL,说明这个协程需要主动的回收,此时需要将协程设置为僵尸态,并加入僵尸态队列。同时通知阻塞等待回收的协程。
int st_thread_join(_st_thread_t *thread, void **retvalp)
{
_st_cond_t *term = thread->term;
if (term == NULL) {
errno = EINVAL;
return -1;
}
if (_ST_CURRENT_THREAD() == thread) {
errno = EDEADLK;
return -1;
}
if (term->wait_q.next != &term->wait_q) {
errno = EINVAL;
return -1;
}
while (thread->state != _ST_ST_ZOMBIE) {
if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0)
return -1;
}
if (retvalp)
*retvalp = thread->retval;
thread->state = _ST_ST_RUNNABLE;
_ST_DEL_ZOMBIEQ(thread);
_ST_ADD_RUNQ(thread);
return 0;
}
协程在回收其他协程,此时待回收的协程还没有退出,主动回收的协程将进入条件变量等待。当待回收的协程退出时,会激活条件变量上的协程。
主动回收的协程从条件变量返回后,此时待回收的协程处于僵尸态,获取返回值后,此时需要再次将待回收的协程置为可运行状态,并加入就绪运行队列。待回收协程会再次进入st_thread_exit()函数,从_ST_SWITCH_CONTEXT返回,主动销毁条件变量和栈空间,最后通过_ST_SWITCH_CONTEXT让出执行权,这时协程才算退出。
void st_thread_yield()
{
_st_thread_t *me = _ST_CURRENT_THREAD();
_st_vp_check_clock();
if (_ST_RUNQ.next == &_ST_RUNQ) {
return;
}
me->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(me);
_ST_SWITCH_CONTEXT(me);
}
协程在运行的过程中,可以主动的让出执行权。在让出执行权的时候,需要将自己主动加入到就绪队列中,等待再次被调度。
socket的处理int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
_st_pollq_t pq;
_st_thread_t *me = _ST_CURRENT_THREAD();
int n;
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
_ST_SWITCH_CONTEXT(me);
n = 0;
if (pq.on_ioq) {
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
} else {
for (pd = pds; pd < epd; pd++) {
if (pd->revents)
n++;
}
}
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
return n;
}
注册需要监听的事件,然后让出CPU执行权,当事件触发后再次从_ST_SWITCH_CONTEXT返回继续处理。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
struct pollfd pd;
int n;
pd.fd = fd->osfd;
pd.events = (short) how;
pd.revents = 0;
if ((n = st_poll(&pd, 1, timeout)) < 0)
return -1;
if (n == 0) {
errno = ETIME;
return -1;
}
if (pd.revents & POLLNVAL) {
errno = EBADF;
return -1;
}
return 0;
}
对监听一个文件描述符的封装
_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{
int osfd, err;
_st_netfd_t *newfd;
while ((osfd = accept(fd->osfd, addr, (socklen_t *)addrlen)) < 0) {
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return NULL;
if (st_netfd_poll(fd, POLLIN, timeout) < 0)
return NULL;
}
newfd = _st_netfd_new(osfd, 1, 1);
if (!newfd) {
err = errno;
close(osfd);
errno = err;
}
return newfd;
}
fd被设置为了非阻塞,调用accept()函数后,若没有客户端请求连接,则立即从accept返回,若errno为EAGAIN或EWOULDBLOCK,说明没有客户端连接,然后执行st_netfd_poll()函数,在此函数内会为fd注册读事件,同时会让出CPU的执行权。当fd的读事件触发后,本协程会再次被调度从而获得CPU执行权,接着往下执行。
ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout)
{
ssize_t n;
while ((n = read(fd->osfd, buf, nbyte)) < 0) {
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return -1;
if (st_netfd_poll(fd, POLLIN, timeout) < 0)
return -1;
}
return n;
}
read的原理同accept,不再赘述。



