一:引入
Unix/Linux上有五种IO模型:阻塞(blocking)、非阻塞(non-blocking)、IO复用(IO multiplexing)、信号驱动(signal-driven)、异步(asynchronous)
当IO复用搭配阻塞IO可能会出现什么问题呢?应该如何尽量避免呢?
二:测试工具
chargen:服务端accept连接之后,不停地发送测试数据。
linux的nc及python程序netcat_nonblock.py,netcat.py
三:测试实验
(1)开启服务端chargen,用不同的nc连接观察异同
(2)使用linux自带nc连接。
1)输入缓冲区无输入时
2)有输入时
(3)netcat.py连接chargen时
1)输入缓冲区无输入时
2)有输入时
使用strace追踪代码
strace python netcat.py localhost 5001 /dev/null
发现程序阻塞在发送上
sendto(3, " "..., 8192, 0, NULL, 0
使用netstate -tpn观察tcp读写缓冲区接收与发送数据大小
netstate -tpn | grep 5001
(4)netcat_nonblock.py连接chargen时
1)输入缓冲区无输入时
2)有输入时
四:实验分析
(1)netcat.py代码如下:
#!/usr/bin/python
import os
import select
import socket
import sys
#发送方与接收方数据传输不是独立的,阻塞io可能导致阻塞
#io复用,检查那个描述符准备好
def relay(sock):
poll = select.poll()
#注册两个读事件,读sock和stdin
poll.register(sock, select.POLLIN)
poll.register(sys.stdin, select.POLLIN)
done = False
while not done:
events = poll.poll(10000) # 10 seconds
for fileno, event in events:
if event & select.POLLIN:
if fileno == sock.fileno():
data = sock.recv(8192)
if data:
sys.stdout.write(data)
else:
done = True
else:
assert fileno == sys.stdin.fileno()
data = os.read(fileno, 8192)
if data:
sock.sendall(data)
else:
sock.shutdown(socket.SHUT_WR) #将socket数据读完再关闭连接,防止tcp发生rst分解,使数据接收不完整
poll.unregister(sys.stdin)
def main(argv):
if len(argv) < 3:
binary = argv[0]
print "Usage:n %s -l portn %s host port" % (argv[0], argv[0])
return
port = int(argv[2])
if argv[1] == "-l":
# server
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', port))
server_socket.listen(5)
(client_socket, client_address) = server_socket.accept()
server_socket.close()
relay(client_socket)
else:
# client
sock = socket.create_connection((argv[1], port))
relay(sock)
if __name__ == "__main__":
main(sys.argv)
结合实验(3)第(2)个实验看到,阻塞io下当从输入缓冲区输入数据时,tcp缓冲区chargen及python都在写数据,进而导致两者都不能读数据而永久阻塞在写数据上。故说明发送方与接收方数据传输不是独立的,阻塞io可能会阻塞。
注意点:
sock.shutdown(socket.SHUT_WR)
当发送完数据之后再进行关闭写端,否则会导致数据接收不完整。
netcat_nonblock.py
#!/usr/bin/python
import errno
import fcntl
import os
import select
import socket
import sys
def setNonBlocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def nonBlockingWrite(fd, data):
try:
nw = os.write(fd, data)
return nw
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return -1
def relay(sock):
socketEvents = select.POLLIN
poll = select.poll()
poll.register(sock, socketEvents)
poll.register(sys.stdin, select.POLLIN)
setNonBlocking(sock) //sock用非阻塞 //stdout用阻塞
# setNonBlocking(sys.stdin)
# setNonBlocking(sys.stdout)
done = False
socketOutputBuffer = ''
while not done:
events = poll.poll(10000) # 10 seconds
for fileno, event in events:
if event & select.POLLIN:
if fileno == sock.fileno():
data = sock.recv(8192)
if data:
nw = sys.stdout.write(data) # stdout does support non-blocking write, though
else:
done = True
else:
assert fileno == sys.stdin.fileno()
data = os.read(fileno, 8192)
if data:
assert len(socketOutputBuffer) == 0
nw = nonBlockingWrite(sock.fileno(), data)
if nw < len(data): //发生shortWrite
if nw < 0:
nw = 0
socketOutputBuffer = data[nw:] //用socketOutputBuffer保存剩余数据
socketEvents |= select.POLLOUT
poll.register(sock, socketEvents) //sock登记写事件 前三行通用
poll.unregister(sys.stdin) //因netcat与stdin故采用此行,不通用
else://数据读完
sock.shutdown(socket.SHUT_WR)
poll.unregister(sys.stdin)
if event & select.POLLOUT:
if fileno == sock.fileno():
assert len(socketOutputBuffer) > 0
nw = nonBlockingWrite(sock.fileno(), socketOutputBuffer)
if nw < len(socketOutputBuffer): //也可能发生shortWrite
assert nw > 0
socketOutputBuffer = socketOutputBuffer[nw:] //保留未发送数据
else:
socketOutputBuffer = ''
socketEvents &= ~select.POLLOUT //停止观察pollOut事件,否则缓冲区无数据写但sock可写引起电平触发使cpu占满
poll.register(sock, socketEvents) //与前面对应
poll.register(sys.stdin, select.POLLIN)
def main(argv):
if len(argv) < 3:
binary = argv[0]
print "Usage:n %s -l portn %s host port" % (argv[0], argv[0])
print (sys.stdout.write)
return
port = int(argv[2])
if argv[1] == "-l":
0 # server
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', port))
server_socket.listen(5)
(client_socket, client_address) = server_socket.accept()
server_socket.close()
relay(client_socket)
else:
# client
sock = socket.create_connection((argv[1], port))
relay(sock)
if __name__ == "__main__":
main(sys.argv)
根据实验(4)可知使用非阻塞IO可解决阻塞问题。
关于写事件的通用形式:
if nw < len(data): //发生shortWrite
if nw < 0:
nw = 0
socketOutputBuffer = data[nw:] //用socketOutputBuffer保存剩余数据
socketEvents |= select.POLLOUT
poll.register(sock, socketEvents)
当发生shortWrite时,应将剩余数据保存在outBuffer中,然后注册POLLOUT事件,一旦socket变得可写就立刻发送数据。下次写时如果还有剩余,应该继续关注POLLOUT事件。
socketOutputBuffer = '' socketEvents &= ~select.POLLOUT poll.register(sock, socketEvents)
数据写完后停止观察pollOut事件,否则缓冲区无数据写但sock可写引起电平触发使cpu占满
四:实验结论
IO multiplexing和blocking IO用在一起可能会阻塞,在Linux多线程服务端编程中因为blocking IO中read()/write()/accept()/connect()都有可能阻塞当前线程,这样线程就没办法处理其他socket上的IO事件了,故一般不将IO multiplexing和blocking IO用在一起。
五:参考
1《Linux多线程服务端编程使用muduo C++网络库》 --陈硕
2陈硕老师的程序chargen,netcat.py 及netcat_nonblock.py源于https://github.com/chenshuo/recipes.git



