1,非阻塞io
流程图:1,rl-wr-rr-wl 可以变换成 2,rl-wr rr-wl
#include#include #include #include #include #include #include #define TTY1 "/dev/tty11" #define TTY2 "/dev/tty12" #define BUFSIZE 1024 enum { STATE_R=1, STATE_W, STATE_E, STATE_T }; struct fsm_st { int state; int sfd; int dfd; char buf[BUFSIZE]; int len; int pos; char *errstr; }; void fsm_driver(struct fsm_st *fsm) { int ret; switch(fsm->state) { case STATE_R: ret = read(fsm->sfd, fsm->buf, BUFSIZE); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_R; else { fsm->errstr = "read()"; fsm->state = STATE_E; } } else if(ret == 0) { fsm->state = STATE_T; } else{ fsm->state = STATE_W; fsm->len = ret; fsm->pos = 0; } break; case STATE_W: ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_W; else { fsm->errstr = "write()"; fsm->state = STATE_E; } } else { if(ret < fsm->len) { fsm->pos += ret; fsm->len -= ret; fsm->state = STATE_W; } else fsm->state = STATE_R; } break; case STATE_E: perror(fsm->errstr); fsm->state = STATE_T; break; case STATE_T: break; default: abort(); } } void relay(int fd1, int fd2) { int fd1_save, fd2_save; struct fsm_st fsm12, fsm21; fd1_save = fcntl(fd1, F_GETFL); fcntl(fd1, F_SETFL, fd1_save | O_NONBLOCK); fd2_save = fcntl(fd2, F_GETFL); fcntl(fd2, F_SETFL, fd2_save | O_NONBLOCK); fsm12.state = STATE_R; fsm12.sfd = fd1; fsm12.dfd = fd2; fsm21.state = STATE_R; fsm21.sfd = fd2; fsm21.dfd = fd1; while(fsm12.state != STATE_T || fsm21.state != STATE_T) { fsm_driver(&fsm12); fsm_driver(&fsm21); } fcntl(fd1, F_SETFL, fd1_save); fcntl(fd2, F_SETFL, fd2_save); } int main() { int fd1, fd2; fd1 = open(TTY1, O_RDWR); if(fd1 < 0) { perror("open()"); exit(1); } write(fd1, "TTY1n", 5); fd2 = open(TTY2, O_RDWR); if(fd2 < 0) { perror("open()"); exit(1); } write(fd2, "TTY2n", 5); relay(fd1, fd2); close(fd1); close(fd2); exit(0); }
注意运行时要加上sudo权限,切换tty11和tty12要用到ctrl+alt+f11/f12
读写那一块代码可以优化下
case STATE_R:
fsm->len = read(fsm->sfd, fsm->buf, BUFSIZE);
if(fsm->len < 0)
{
if(errno == EAGAIN)
fsm->state = STATE_R;
else
{
fsm->errstr = "read()";
fsm->state = STATE_E;
}
}
else if(fsm->len == 0)
{
fsm->state = STATE_T;
}
else
{
fsm->state = STATE_W;
fsm->pos = 0;
}
break;
case STATE_W:
ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len);
if(ret < 0)
{
if(errno == EAGAIN)
fsm->state = STATE_W;
else
{
fsm->errstr = "write()";
fsm->state = STATE_E;
}
}
else
{
fsm->pos += ret;
fsm->len -= ret;
if(fsm->len != 0)
{
fsm->state = STATE_W;
}
else
fsm->state = STATE_R;
}
break;
上述程序由于一直在忙读,读没有数据返回errno(EAGAIN),所以程序所占用cpu核很多。
2 采用select多路复用
以事件为单位,监听文件描述符。
select返回时,监视现场不再是原来的了,会被清空。因为监视现场和监视结果放在同一个空间
#include#include #include #include #include #include #include #include #define TTY1 "/dev/tty2" #define TTY2 "/dev/tty3" #define BUFSIZE 1024 enum { STATE_R=1, STATE_W, STATE_AUTO, //由于select 只能监听读写变化, STATE_E, STATE_T }; struct fsm_st { int state; int sfd; int dfd; char buf[BUFSIZE]; int len; int pos; char *errstr; }; void fsm_driver(struct fsm_st *fsm) { int ret; switch(fsm->state) { case STATE_R: ret = read(fsm->sfd, fsm->buf, BUFSIZE); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_R; else { fsm->errstr = "read()"; fsm->state = STATE_E; } } else if(ret == 0) { fsm->state = STATE_T; } else{ fsm->state = STATE_W; fsm->len = ret; fsm->pos = 0; } break; case STATE_W: ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_W; else { fsm->errstr = "write()"; fsm->state = STATE_E; } } else { if(ret < fsm->len) { fsm->pos += ret; fsm->len -= ret; fsm->state = STATE_W; } else fsm->state = STATE_R; } break; case STATE_E: perror(fsm->errstr); fsm->state = STATE_T; break; case STATE_T: break; default: abort(); } } static int max(int a, int b) { if(a > b) return a; return b; } void relay(int fd1, int fd2) { int fd1_save, fd2_save; struct fsm_st fsm12, fsm21; fd_set rdset, wrset; fd1_save = fcntl(fd1, F_GETFL); fcntl(fd1, F_SETFL, fd1_save | O_NONBLOCK); fd2_save = fcntl(fd2, F_GETFL); fcntl(fd2, F_SETFL, fd2_save | O_NONBLOCK); fsm12.state = STATE_R; fsm12.sfd = fd1; fsm12.dfd = fd2; fsm21.state = STATE_R; fsm21.sfd = fd2; fsm21.dfd = fd1; while(fsm12.state != STATE_T || fsm21.state != STATE_T) { FD_ZERO(&rdset); FD_ZERO(&wrset); if(fsm12.state == STATE_R) FD_SET(fsm12.sfd, &rdset); // 有事情才去监听,不是加上所有的 if(fsm12.state == STATE_W) FD_SET(fsm12.dfd, &wrset); if(fsm21.state == STATE_R) FD_SET(fsm21.sfd, &rdset); if(fsm21.state == STATE_W) FD_SET(fsm21.dfd, &wrset); if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) { if (select(max(fd1, fd2) +1, &rdset, &wrset, NULL, NULL) < 0) // 最大文件描述符+1 { if(errno == EINTR) continue; perror("select"); exit(1); } } if(FD_ISSET(fsm12.sfd, &rdset)||FD_ISSET(fsm12.dfd, &wrset)||fsm12.state > STATE_AUTO) fsm_driver(&fsm12); if(FD_ISSET(fsm21.sfd, &rdset)||FD_ISSET(fsm21.dfd, &wrset)||fsm21.state > STATE_AUTO) fsm_driver(&fsm21); } fcntl(fd1, F_SETFL, fd1_save); fcntl(fd2, F_SETFL, fd2_save); } int main() { int fd1, fd2; fd1 = open(TTY1, O_RDWR); if(fd1 < 0) { perror("open()"); exit(1); } write(fd1, "TTY1n", 5); fd2 = open(TTY2, O_RDWR); if(fd2 < 0) { perror("open()"); exit(1); } write(fd2, "TTY2n", 5); relay(fd1, fd2); close(fd1); close(fd2); exit(0); }
3,poll多路复用
以文件描述符为单位组织事件
poll里面的文件描述符数组,用来组织事件
事件是一个short长度的位图,所以要按位&发现事件,|来置位事件
#include#include #include #include #include #include #include #include #include #define TTY1 "/dev/tty2" #define TTY2 "/dev/tty3" #define BUFSIZE 1024 enum { STATE_R=1, STATE_W, STATE_AUTO, //由于select 只能监听读写变化, STATE_E, STATE_T }; struct fsm_st { int state; int sfd; int dfd; char buf[BUFSIZE]; int len; int pos; char *errstr; }; void fsm_driver(struct fsm_st *fsm) { int ret; switch(fsm->state) { case STATE_R: ret = read(fsm->sfd, fsm->buf, BUFSIZE); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_R; else { fsm->errstr = "read()"; fsm->state = STATE_E; } } else if(ret == 0) { fsm->state = STATE_T; } else{ fsm->state = STATE_W; fsm->len = ret; fsm->pos = 0; } break; case STATE_W: ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_W; else { fsm->errstr = "write()"; fsm->state = STATE_E; } } else { if(ret < fsm->len) { fsm->pos += ret; fsm->len -= ret; fsm->state = STATE_W; } else fsm->state = STATE_R; } break; case STATE_E: perror(fsm->errstr); fsm->state = STATE_T; break; case STATE_T: break; default: abort(); } } static int max(int a, int b) { if(a > b) return a; return b; } void relay(int fd1, int fd2) { int fd1_save, fd2_save; struct fsm_st fsm12, fsm21; struct pollfd pfd[2]; fd1_save = fcntl(fd1, F_GETFL); fcntl(fd1, F_SETFL, fd1_save | O_NONBLOCK); fd2_save = fcntl(fd2, F_GETFL); fcntl(fd2, F_SETFL, fd2_save | O_NONBLOCK); fsm12.state = STATE_R; fsm12.sfd = fd1; fsm12.dfd = fd2; fsm21.state = STATE_R; fsm21.sfd = fd2; fsm21.dfd = fd1; pfd[0].fd = fd1; pfd[1].fd = fd2; while(fsm12.state != STATE_T || fsm21.state != STATE_T) { pfd[0].events = 0; pfd[1].events = 0; if(fsm12.state == STATE_R) pfd[0].events |= POLLIN; if(fsm12.state == STATE_W) pfd[1].events |= POLLOUT; if(fsm21.state == STATE_R) pfd[1].events |= POLLIN; if(fsm21.state == STATE_W) pfd[0].events |= POLLOUT; if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) { while(poll(pfd, 2, -1) < 0) // 最大文件描述符+1 { if(errno == EINTR) continue; perror("select"); exit(1); } } if( pfd[0].revents & POLLIN || pfd[1].revents & POLLOUT||fsm12.state > STATE_AUTO) fsm_driver(&fsm12); if( pfd[1].revents & POLLIN || pfd[0].revents & POLLOUT||fsm21.state > STATE_AUTO) fsm_driver(&fsm21); } fcntl(fd1, F_SETFL, fd1_save); fcntl(fd2, F_SETFL, fd2_save); } int main() { int fd1, fd2; fd1 = open(TTY1, O_RDWR); if(fd1 < 0) { perror("open()"); exit(1); } write(fd1, "TTY1n", 5); fd2 = open(TTY2, O_RDWR); if(fd2 < 0) { perror("open()"); exit(1); } write(fd2, "TTY2n", 5); relay(fd1, fd2); close(fd1); close(fd2); exit(0); }
4,epoll多路复用
以文件描述符为单位,监听事件
epoll把poll文件描述符数组封装到了kernel上,并提供epoll方法
epoll_create(int size) size不是数组(poll文件描述符数组)大小,正数就可以了。返回的是epoll文件描述符。
#include#include #include #include #include #include #include #include #include #define TTY1 "/dev/tty12" #define TTY2 "/dev/tty13" #define BUFSIZE 1024 enum { STATE_R=1, STATE_W, STATE_AUTO, //由于select 只能监听读写变化, STATE_E, STATE_T }; struct fsm_st { int state; int sfd; int dfd; char buf[BUFSIZE]; int len; int pos; char *errstr; }; void fsm_driver(struct fsm_st *fsm) { int ret; switch(fsm->state) { case STATE_R: ret = read(fsm->sfd, fsm->buf, BUFSIZE); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_R; else { fsm->errstr = "read()"; fsm->state = STATE_E; } } else if(ret == 0) { fsm->state = STATE_T; } else{ fsm->state = STATE_W; fsm->len = ret; fsm->pos = 0; } break; case STATE_W: ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_W; else { fsm->errstr = "write()"; fsm->state = STATE_E; } } else { if(ret < fsm->len) { fsm->pos += ret; fsm->len -= ret; fsm->state = STATE_W; } else fsm->state = STATE_R; } break; case STATE_E: perror(fsm->errstr); fsm->state = STATE_T; break; case STATE_T: break; default: abort(); } } static int max(int a, int b) { if(a > b) return a; return b; } void relay(int fd1, int fd2) { int fd1_save, fd2_save; struct fsm_st fsm12, fsm21; int epfd; struct epoll_event ev; fd1_save = fcntl(fd1, F_GETFL); fcntl(fd1, F_SETFL, fd1_save | O_NONBLOCK); fd2_save = fcntl(fd2, F_GETFL); fcntl(fd2, F_SETFL, fd2_save | O_NONBLOCK); fsm12.state = STATE_R; fsm12.sfd = fd1; fsm12.dfd = fd2; fsm21.state = STATE_R; fsm21.sfd = fd2; fsm21.dfd = fd1; epfd = epoll_create(10); if(epfd < 0) { perror("epoll_create"); exit(1); } ev.events = 0; ev.data.fd = fd1; epoll_ctl(epfd, EPOLL_CTL_ADD, fd1, &ev); ev.events = 0; ev.data.fd = fd2; epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &ev); while(fsm12.state != STATE_T || fsm21.state != STATE_T) { ev.data.fd = fd1; ev.events = 0; if(fsm12.state == STATE_R) ev.events |= EPOLLIN; if(fsm21.state == STATE_W) ev.events |= EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_MOD, fd1, &ev); ev.data.fd = fd2; ev.events = 0; if(fsm21.state == STATE_R) ev.events |= EPOLLIN; if(fsm12.state == STATE_W) ev.events |= EPOLLOUT; epoll_ctl(epfd, EPOLL_CTL_MOD, fd2, &ev); if(fsm12.state < STATE_AUTO || fsm21.state < STATE_AUTO) { while(epoll_wait(epfd, &ev, 2, -1) < 0) // 最大文件描述符+1 { if(errno == EINTR) continue; perror("select"); exit(1); } } if( (ev.data.fd == fd1 && ev.events == EPOLLIN) || (ev.data.fd == fd2 && ev.events == EPOLLOUT)||fsm12.state > STATE_AUTO) fsm_driver(&fsm12); if( (ev.data.fd == fd2 && ev.events == EPOLLIN) || (ev.data.fd == fd1 && ev.events == EPOLLOUT)||fsm21.state > STATE_AUTO) fsm_driver(&fsm21); } fcntl(fd1, F_SETFL, fd1_save); fcntl(fd2, F_SETFL, fd2_save); close(epfd); } int main() { int fd1, fd2; fd1 = open(TTY1, O_RDWR); if(fd1 < 0) { perror("open()"); exit(1); } write(fd1, "TTY1n", 5); fd2 = open(TTY2, O_RDWR); if(fd2 < 0) { perror("open()"); exit(1); } write(fd2, "TTY2n", 5); relay(fd1, fd2); close(fd1); close(fd2); exit(0); }
6,中继引擎
忙等状态,因为一直在轮训,轮序查看每个读写状态机,每个状态机都推动一次。
主程序要pause()停止,否则程序就停止工作了。开启的线程不断在工作,跟前面的用多线程实现mytbf一样,线程工作。
#include#include #include #include #include #include #include #include #include #include "relay.h" #define BUFSIZE 1024 enum { STATE_R=1, STATE_W, STATE_E, STATE_T }; enum { STATE_RUNNING=1, STATE_CANCELED, STATE_OVER }; struct fsm_st { int state; int sfd; int dfd; char buf[BUFSIZE]; int len; int pos; char *errstr; }; struct rel_job_st { int fd1, fd2; int job_state; struct fsm_st fsm12, fsm21; int fd1_save, fd2_save; }; static struct rel_job_st *rel_job[REL_JOBMAX]; static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; static pthread_once_t once_init=PTHREAD_ONCE_INIT; void fsm_driver(struct fsm_st *fsm) { int ret; switch(fsm->state) { case STATE_R: ret = read(fsm->sfd, fsm->buf, BUFSIZE); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_R; else { fsm->errstr = "read()"; fsm->state = STATE_E; } } else if(ret == 0) { fsm->state = STATE_T; } else{ fsm->state = STATE_W; fsm->len = ret; fsm->pos = 0; } break; case STATE_W: ret = write(fsm->dfd, fsm->buf + fsm->pos, fsm->len); if(ret < 0) { if(errno == EAGAIN) fsm->state = STATE_W; else { fsm->errstr = "write()"; fsm->state = STATE_E; } } else { if(ret < fsm->len) { fsm->pos += ret; fsm->len -= ret; fsm->state = STATE_W; } else fsm->state = STATE_R; } break; case STATE_E: perror(fsm->errstr); fsm->state = STATE_T; break; case STATE_T: break; default: abort(); } } static int get_pos_unlock() { int i; for(i=0; i job_state == STATE_RUNNING) { fsm_driver(&rel_job[i]->fsm12); fsm_driver(&rel_job[i]->fsm21); if(rel_job[i]->fsm12.state == STATE_T && rel_job[i]->fsm21.state == STATE_T) { rel_job[i]->job_state = STATE_OVER; } } } } pthread_mutex_unlock(&mtx); } } static void module_load(void) { printf("%s:%dn", __FUNCTION__, __LINE__); int err; pthread_t tid_relayer; err = pthread_create(&tid_relayer, NULL, thr_worker, NULL); if(err) { fprintf(stderr, "pthread_create():%sn", strerror(err)); exit(1); } } int rel_addjob(int fd1, int fd2) { struct rel_job_st *me; int pos; pthread_once(&once_init, module_load); printf("%s:%dn", __FUNCTION__, __LINE__); me = malloc(sizeof(*me)); if(me == NULL) return -ENOMEM; me->fd1 = fd1; me->fd2 = fd2; me->job_state = STATE_RUNNING; me->fd1_save = fcntl(me->fd1, F_GETFL); fcntl(me->fd1, F_SETFL, me->fd1_save|O_NONBLOCK); me->fd2_save = fcntl(me->fd2, F_GETFL); fcntl(me->fd2, F_SETFL, me->fd2_save|O_NONBLOCK); me->fsm12.sfd = me->fd1; me->fsm12.dfd = me->fd2; me->fsm12.state = STATE_R; me->fsm21.sfd = me->fd2; me->fsm21.dfd = me->fd1; me->fsm21.state = STATE_R; pthread_mutex_lock(&mtx); pos = get_pos_unlock(); if(pos < 0) { pthread_mutex_unlock(&mtx); fcntl(me->fd1, F_SETFL, me->fd1_save); fcntl(me->fd2, F_SETFL, me->fd2_save); free(me); return -ENOSPC; } rel_job[pos] = me; pthread_mutex_unlock(&mtx); return pos; }
main函数
#include#include #include #include #include #include #include #include #include "relay.h" #define TTY1 "/dev/tty9" #define TTY2 "/dev/tty8" #define TTY3 "/dev/tty10" #define TTY4 "/dev/tty11" int main() { int fd1, fd2, fd3, fd4; int job1, job2; fd1 = open(TTY1, O_RDWR); if(fd1 < 0) { perror("open()"); exit(1); } write(fd1, "TTY1n", 5); fd2 = open(TTY2, O_RDWR); if(fd2 < 0) { perror("open()"); exit(1); } write(fd2, "TTY2n", 5); job1 = rel_addjob(fd1, fd2); if(job1 <0) { fprintf(stderr, "rel_addjob():%sn", strerror(-job1)); exit(1); } //job2 fd3 = open(TTY3, O_RDWR); if(fd3 < 0) { perror("open()"); exit(1); } write(fd3, "TTY3n", 5); fd4 = open(TTY4, O_RDWR); if(fd4 < 0) { perror("open()"); exit(1); } write(fd4, "TTY4n", 5); job2 = rel_addjob(fd3, fd4); if(job2 <0) { fprintf(stderr, "rel_addjob():%sn", strerror(-job2)); exit(1); } while (1) pause(); close(fd1); close(fd2); close(fd3); close(fd4); exit(0); }



