IO 模型


IO模型介绍

Stevens在文章中一共比较了五种IO Model:

1
2
3
4
5
# 1、blocking IO 阻塞IO
# 2、nonblocking IO 非阻塞IO
# 3、IO multiplexing IO多路复用
# 4、signal driven IO 信号驱动IO
# 5、asynchronous IO 异步IO

由于signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model

IO发生时涉及的对象和步骤:
对于一个network IO (这里我们以read举例),它会涉及到 两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。
当一个read操作发生时,该操作会经历两个阶段:

1
2
#1)等待数据准备 (Waiting for the data to be ready)
#2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

阻塞 IO(blocking IO)

  1. 当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。
    kernel要等待足够的数据到来,用户进程这边,整个进程会被阻塞。
  2. 有数据到来后,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
  3. 所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)
    都被block了。

几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,
在此期间,线程将无法执行任何运算或响应任何的网络请求。

1
2
3
# 使用多线程或多进程解决阻塞?
# 在服务器端使用多线程(或多进程)。
# 多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

1
2
3
# 多线程或多进程的问题:
# 开启多进程多线程的方式,在遇到要同时响应成百上千路的连接请求,
# 则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
1
2
3
4
# 使用线程池或进程池的问题:
# 线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。
# 而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。
# 所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。
总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,
可以用非阻塞接口来尝试解决这个问题。

非阻塞 IO(non-blocking IO)

  1. 当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。
    从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。
  2. 用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,
    或者直接再次发送read操作。
  3. 一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),
    然后返回。

所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
但是非阻塞IO模型绝不被推荐。

1
2
3
4
5
#1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
#2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。
# 这会导致整体数据吞吐量的降低。
#3. recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,
# 例如select()多路复用模式,可以一次检测多个连接是否活跃。

多路复用 IO(IO multiplexing)

IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。
有些地方也称这种IO方式为事件驱动IO(event driven IO)。
select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。
它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

  1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
  2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
    结论: select的优势在于可以处理多个连接,不适用于单个连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# IO 多路复用
# 代理帮忙去查看链接和接收发送消息,代理可以监听多个对象
# windows 拥有select模块
import select
import socket

sk = socket.socket()
sk.bind(('127.0.0.1',8000))
sk.setblocking(False) # 非阻塞模式
sk.listen()

read_lst = [sk] # 监听谁就放谁

# print(ret) # ([<socket.socket fd=204, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8000)>], [], [])
# print(r_lst) # [<socket.socket fd=204, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8000)>]
# print(sk) # <socket.socket fd=204, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8000)>
# r_lst 里面存着的 sk对象

while True:
# 当有链接来连,获取了socket对象并链接
# 将对象再放入列表中继续监听 [sk,conn]
r_lst, w_lst, x_lst = select.select(read_lst, [], []) # r_list和其他空列表
# print('******',r_lst)
for i in r_lst:
if i is sk:
conn,addr = i.accept()
# print(conn) # <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8000), raddr=('127.0.0.1', 61832)>
read_lst.append(conn)
else:
# 否则是conn
ret = i.recv(1024)
if ret == b'':
i.close()
read_lst.remove(i)
continue
print(ret)
i.send(b'bye')

# IO多路复用
# select Windows Linux 都是操作系统轮询每一个被监听的项,看是否有读才做
# poll Linux 可以监听的对象比select对象多
# 随着监听项的增多,导致效率降低
# epoll Linux 给每一个监听对象都绑定一个回调函数

# selectors 可以自动帮我们选适合操作系统的模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# client
import socket
import time
import threading

def func():
sk = socket.socket()
sk.connect(('127.0.0.1',8000))
sk.send(b'hello')

time.sleep(3)
print(sk.recv(1024))
sk.close()

for i in range(20):
threading.Thread(target=func).start()

异步IO (Asynchronous I/O)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
### 需要记住
# 同步
# 提交一个任务之后,要等待这个任务执行完毕

# 异步
# 只管提交任务,不等待这个任务执行完毕,就可以做其他事情

# 阻塞
# recv、recvfrom、accept

# 非阻塞
# 都可以正常执行

# 阻塞和非阻塞
# 线程遇到阻塞 : 运行 --> 阻塞 --> 就绪 --> 运行
# 线程遇到非阻塞:运行 <--> 就绪
1
2
3
4
5
6
7
8
9
### 需要理解
# 1、异步IO 没有数据 直接返回
# 2、操作系统会告诉我有新数据进来,并直接将数据交给用户

# 异步IO 没有wait data 和 copy data
# 其他模型只是对wait data 进行优化
# 异步IO 都是操作系统来完成 wait data 和 copy data,Python并不能完成,C语言可以直接实现
# Python的异步模块和框架 很多都是用底层的C语言实现 框架包括Twisted和Tornado
# 异步框架 可以相应更多的需求

IO模型比较分析

1
2
# blocking和non-blocking的区别
# 调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回

selectors 模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#服务端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept() # accept 建立后 注册conn
sel.register(conn,selectors.EVENT_READ,read) # 监听conn的读时间 , 回调函数 read

def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()


# 建立socket对象
sk=socket(AF_INET,SOCK_STREAM) # socket对象
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk.bind(('127.0.0.1',8088))
sk.listen(5)
sk.setblocking(False) #设置socket的接口为非阻塞

# 选择适合的IO多路复用机制
# sel=selectors.DefaultSelector() selectors模块
sel.register(sk,selectors.EVENT_READ,accept)
# selectors.EVENT_READ 监听的是一个读事件
# 相当于网select的读列表里append了一个socket对象,并且绑定了一个回调函数 accept
# 有人请求链接socket了 就调用 accept方法 accept(server_fileobj,mask)

while True:
events=sel.select() # 检测所有的fileobj(socket|conn),是否有完成wait data
for sel_obj,mask in events:
callback=sel_obj.data # callback=accpet | read
callback(sel_obj.fileobj,mask) # accpet(server_fileobj,1) | read(conn,l)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))


# selectors 模块 可以帮助我们选择适合操作系统的IO多路复用模块
# 非阻塞IO一直在询问,会耗费CPU

# 1、异步IO 没有数据 直接返回
# 2、操作系统会告诉我有新数据进来,并直接将数据交给用户