进程


进程

理论基础

一、操作系统的作用:

  1. 隐藏丑陋复杂的硬件接口,提供良好的抽象接口
  2. 管理、调度进程,并且将多个进程对硬件的竞争变得有序

二、多道技术:

  1. 产生背景:针对单核,实现并发
    ps:
    现在的主机一般是多核,那么每个核都会利用多道技术
    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
    cpu中的任意一个,具体由操作系统调度算法决定。

  2. 空间上的复用:如内存中同时有多道程序

  3. 时间上的复用:复用一个cpu的时间片
    强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行

三、进程:

  • 进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。

  • 进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的。
    PS:即使可以利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。

  • 内存越大,多个程序占用的空间越大;CPU核数越多,同一时间处理的任务越多。

什么是进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。

  1. 进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令和本地变量。
  2. 进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时(操作系统执行之),它才能成为一个活动的实体,我们称其为进程。[3]
    进程是操作系统中最基本、重要的概念。是多道程序系统出现后,为了刻画系统内部出现的动态情况,描述系统内部各道程序的活动规律引进的一个概念,所有多道程序设计操作系统都建立在进程的基础上。

进程的并行与并发

  1. 并行 : 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )

  2. 并发 : 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

  3. 区别:
    并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
    并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

进程的三状态

在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞。
  (1)就绪(Ready)状态
  当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

  (2)执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

  (3)阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

同步与异步

  1. 同步: 串行处理一件事物
  2. 异步: 同时处理多件不同的事物
  • 所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。
  • 所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

阻塞与非阻塞

  1. 阻塞: input 读写文件 产生IO操作
  2. 非阻塞: 不产生IO操作
    阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的

同步/异步与阻塞/非阻塞

  1. 同步阻塞形式 效率低 专心排队,什么别的事都不做
  2. 异步阻塞形式 领一张排队号码,不用排队等着叫号,但是等的过程中不能做其他事情。
    异步操作是可以被阻塞住的,只不过它不是在处理消息时阻塞,而是在等待消息通知时被阻塞。
  3. 同步非阻塞形式 一边排队,一边打电话,来回切换两种不同的行为,效率低
  4. 异步非阻塞形式 效率高,等着柜台(消息触发机制)通知,去外面抽烟,做自己的事

很多人会把同步和阻塞混淆,是因为很多时候同步操作会以阻塞的形式表现出来,同样的,很多人也会把异步和非阻塞混淆,因为异步操作一般都不会在真正的IO操作处被阻塞。

进程的创建与结束

但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程:

  1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
  2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
  3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
  4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)
    无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。  

进程的结束:

  1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
  2. 出错退出(自愿,python a.py中a.py不存在)
  3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try…except…)
  4. 被其他进程杀死(非自愿,如kill -9)

在python程序中的进程操作

创建进程共四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。

multiprocess.process模块

process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:

  1. 需要使用关键字的方式来指定参数
  2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:
1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,’egon’,)
4 kwargs表示调用对象的字典,kwargs={‘name’:’egon’,’age’:18}
5 name为子进程的名称

在windows中使用process模块的注意事项:必须把创建子进程的部分使用if __name__ ==‘__main__’判断保护起来

使用process模块创建进程

1
2
3
4
5
6
# Process([group [, target [, name [, args [, kwargs]]]]])
# target表示调用对象,你可以传入方法的名字
# args表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
# kwargs表示调用对象的字典
# name是别名,相当于给这个进程取一个名字
# group分组,实际上不使用
1
2
3
4
5
6
7
8
9
10
11
12
import time
from multiprocessing import Process

def f(name):
print('hello', name)
print('我是子进程')

if __name__ == '__main__':
p = Process(target=f, args=('leo',))
p.start()
time.sleep(1)
print('执行主进程的内容了')

查看进程的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import os
from multiprocessing import Process
def func(args,args2):
print(args,args2)
# print(54321)
time.sleep(1)
print('子进程:', os.getpid())
print('子进程的父进程:', os.getppid()) # 查看当前进程的父进程号
print(12345)

if __name__ == '__main__':
# 主进程(父进程)执行的
p = Process(target=func,args=('参数','参数2')) # 注册 args传参必须传元祖,里面放着参数
# p 是进程对象,此时还没有启动进程
p.start() # 开启子进程
print('*'*10)
print('父进程:',os.getpid()) # 查看当前进程的进程号
print('父进程的父进程:', os.getppid()) # 查看当前进程的父进程号 在pycharm中启动就是pycharm 可以通过任务管理器查看到


# 如果是同步的函数执行,会先执行完func函数在执行打印
# 异步虽然要先启动子进程,但是不影响后面的代码

# 进程的生命周期:
# 主进程 : 从程序开启开始,直到程序执行完成
# 子进程 : 从start开始,到子进程中的代码执行完成
# 开启了子进程的主进程:
# 主进程自己的代码如果长,等待自己的代码执行结束
# 子进程的执行时间长,主进程会在主进程代码执行完毕后,等待子进程执行完毕,主进程才结束

# python chrom.py & 终端没有关闭 进程都在后台运行
# 父进程在 自己进程就在 不一定,要看怎么启动的
# 子进程不一定要依赖父进程运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from multiprocessing import Process
import time
import os

def func(args,args2):
print('大家好,我是子进程')
print(args,args2)
time.sleep(3)
print('子进程执行完成')
print('子进程:',os.getpid())
print('子进程的父进程: ',os.getppid())

if __name__ == '__main__':
p = Process(target=func,args=(10,20)) # 注册
p.start() # 启动紫禁城
print('当前进程:',os.getpid())
print('当前进程的父进程: ',os.getppid())
print('*'*10)

join方法

p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# join()
import time
from multiprocessing import Process

def func(arg1,arg2):
print('*'*arg1)
time.sleep(5)
print('*'*arg2)

if __name__ == '__main__':
p = Process(target=func, args=(10, 20))
p.start()
print('这个时候还是异步的')

p.join() # 作用:感知一个子进程的结束 ,异步的程序就变成同步了
print('主进程运行完成')


# join 会将异步的程序变成同步

开启多个子进程,并写入文件

通过join方法,让写文件的操作编程异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 多进程写文件
# for 循环500个文件,没处理1个文件需要0.1秒,500个是50秒
# 同步,只有一个进程处理: 0.1 * 500 = 50
# 异步,存在500个进程: 500 * 0.1 = 0.1
# 1. 先往文件夹中写文件
# 2. 展示写入后文件件的所有文件名
# join 会将异步的程序变成同步

import time
import os
from multiprocessing import Process

def func(filename,context):
print('子进程%s开始' %(os.getpid()))
with open(filename,'w') as f:
f.write(str(context))

# 单过在循环外只有1个join无法确定所有子进程都结束,所以需要控制

if __name__ == '__main__':
p_lst = []
for i in range(1,6):
p = Process(target=func,args=('inod%s'%i,i))
p_lst.append(p) # 每创建出来一个进程都加入进程列表
p.start()
# p.join() # 如果在循环里面join,则每循环一个进程都要等待进程的结束,会变成同步
# 如果不用join 开启进程无法保障运行的时间,所以后面的代码一起异步执行

[p.join() for p in p_lst] # 之前的所有进程必须在这里都执行完,才能执行后面的代码
# 列表推导式,先启动所有的进程,按顺序执行,在最后之前保障所有的进程对象执行完成

print('主进程%s执行完成' %(os.getpid()))
print([i for i in os.walk(r'D:\PycharmProjects\Notes\08 并发编程')])

# 场景:
# 同时开启多个子进程,异步执行,当我需要同步执行的时候,设置一个阻拦的手段,让所有的进程在这话话都执行完成

# 结果:
# 子进程8332开始
# 子进程4936开始
# 子进程2728开始
# 子进程9832开始
# 子进程4428开始
# 主进程9340执行完成
# [('inod1', 'inod2', 'inod3', 'inod4', 'inod5'])]

开启多进程方法2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 自定义类继承Process,实现多进程
import os
from multiprocessing import Process

class MyProcess(Process): # 自定义类继承Process
def __init__(self,arg1,arg2): # 传参需要继承父类的__init__
super().__init__() # 继承父类的__init__
self.arg1 = arg1
self.arg2 = arg2

def run(self): # 实现run方法
print(self.pid)
print(self.name)
print(self.arg1)
print(self.arg2)
print('开始一个新进程%s' %os.getpid())

if __name__ == '__main__':

print('主进程:', os.getpid())

p1 = MyProcess(1,2)
p1.start() # start 调用 run方法
p2 = MyProcess(3,4)
p2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import os
from multiprocessing import Process

class MyProcess(Process): # 自定义类继承Process
def __init__(self,arg1,arg2): # 传参需要继承父类的__init__
super().__init__() # 继承父类的__init__
self.arg1 = arg1
self.arg2 = arg2

def run(self): # 实现run方法
print(self.pid)
print(self.name)
print(self.arg1)
print(self.arg2)
print('开始一个新进程%s' %os.getpid())

if __name__ == '__main__':

print('主进程:', os.getpid())

p1 = MyProcess(1,2)
p1.start() # start 调用 run方法
p2 = MyProcess(3,4)
p2.start()

多进程之间的数据隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 进程 与 进程之间数据是隔离的?  是隔离的
# 微信 与 QQ之间的数据能共享么?
import os
from multiprocessing import Process

def func():
global n # 全局变量n
n = 0 # 重新定义n
print('子进程 : %s' %os.getpid(),n)

if __name__ == '__main__':
print('主进程',os.getpid())
n = 100
p = Process(target=func)
p.start()
p.join() # 等待子进程 结束完
print('主进程',n) # 主进程打印的n 和 子进程不一样 ,多进程之间数据有隔离

# 运行结果
# 主进程pid: 5748
# 子进程pid: 6048 0
# 主进程 100

守护进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 特点: 会随着主进程的结束而结束。

# 主进程创建守护进程

  # 其一:守护进程会在主进程代码执行结束后就终止

  # 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

# 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
import time
from multiprocessing import Process

def func():
while True:
time.sleep(0.2)
print('我还活着') # 每隔0.5秒 说下进程状态

if __name__ == '__main__':

p = Process(target=func)
p.daemon = True # 设置子进程为守护进程
p.start()
i = 0
while i < 5:
print('我是socket server')
time.sleep(1)
i += 1

# 守护进程 会随着 主进程的代码执行完毕 而结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 守护进程
# 子进程 转换成 守护进程
# 守护进程 子进程随着主进程 一起结束

import time
from multiprocessing import Process

def func():
while True:
time.sleep(0.2)
print('我还活着') # 每隔0.5秒 说下进程状态

def func2():
print('in func2 开始')
time.sleep(8)
print('in func2 结束')

if __name__ == '__main__':

p = Process(target=func)
p.daemon = True # 设置子进程为守护进程
p.start()

p2 = Process(target=func2) # 另外一个子进程
p2.start()
p2.terminate() # 结束一个进程

print(p2.is_alive()) # 检查进程是否还活着
time.sleep(2)
print(p2.is_alive())
print(p2.name)

# i = 0
# while i < 5:
# print('我是socket server')
# time.sleep(1)
# i += 1

# 守护进程 会随着 主进程的代码执行完毕 而结束
# 在主进程内结束进程 p2.terminate()
# 结束一个进程不是在执行方法之后立即生效,需要一个操作系统响应的过程
# 检验进程是否活着的状态 p2.is_alive()
# p.name p.pid 进程名 和 进程号

多进程中的方法和属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import random
import time
from multiprocessing import Process

class MyProcess(Process):
def __init__(self,user):
super().__init__()
self.user = user

def run(self):
print('子进程 %s 的 pid %s' %(self.name,self.pid))
print('%s 开始执行子进程'%self.user)
time.sleep(random.randrange(1,5))
print('%s 子进程完成' %(self.user))

if __name__ == '__main__':

p1 = MyProcess(user='leo')
p1.start()
p1.terminate() # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) # True
time.sleep(1)
print(p1.is_alive()) # False

# self.name 属性是Process中的属性,标示进程的名字
# super().__init__() # 执行父类的初始化方法会覆盖name属性
# self.pid 查看id
# p1.terminate() 关闭进程
# p1.is_alive() 查看进程是否存活

socket聊天并发实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# server
import socket
from multiprocessing import Process

def server(conn):
# ret = input('>>>').encode('utf-8')
ret = '你好'.encode('utf-8')
conn.send(ret)
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()

if __name__ == '__main__':

sk = socket.socket()
sk.bind(('127.0.0.1',8090))
sk.listen()


while True:
conn,addr = sk.accept() # 拿到链接 放到多进程 每个链接都执行
p = Process(target=server,args=(conn,))
p.start()
sk.close()
1
2
3
4
5
6
7
8
9
10
11
# client
import socket

sk = socket.socket()
sk.connect(('127.0.0.1',8090))

msg = sk.recv(1024).decode('utf-8')
print(msg)
msg2 = input('>>>').encode('utf-8')
sk.send(msg2)
sk.close()

进程同步 —— 锁、信号量和事件

重要程度:Lock(加锁,同一时间1个进程执行)

锁 —— multiprocess.Lock

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from multiprocessing import Process
from multiprocessing import Lock # 进程锁
import json
import os
import time

def show(i): # 查票读取ticket文件
with open('ticket',mode='r') as f:
dic = json.load(f)
print('子进程%s,余票:%s'%(os.getpid(),dic['ticket']))

def buy_ticket(i,lock):
lock.acquire() # 拿钥匙进门 被拿走后 别的进程会在这里阻塞,直到钥匙被还
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)

if dic['ticket'] > 0:
dic['ticket'] -= 1
print('\033[32m%s 买到票了\033[0m' %i)
else:
print('\033[31m%s 没买到票\033[0m' %i)

time.sleep(0.1)
with open('ticket',mode='w') as f: # 修改票结果
json.dump(dic,f)

lock.release() # 还钥匙,已经买完票了

if __name__ == '__main__':
for i in range(10): # 假装有10个人抢票
p = Process(target=show,args=(i,))
p.start()

lock = Lock()

for i in range(10):
p = Process(target=buy_ticket,args=(i,lock))
p.start()

# 给某一段代码加上锁 这段代码在这一段时间内只能让一个进程执行
# 只要多人同时操作一个数据 就会出现数据安全问题,需要牺牲效率 保证数据安全
# {"ticket": 1}

信号量 —— multiprocess.Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 多进程中的组件
# 同步控制
# 进程间通信
# 进程间的数据共享
# 进程池

# ktv 有4个门 20个人同时进出,现在要控制4个人先进
# 一套资源 同一时间 只能被N个人访问
# 某一段代码 在同一时间 只能被n个进程执行

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore


def ktv(i,sem):
sem.acquire() # 获取钥匙
print('%i 走进KTV' %i)
time.sleep(random.randint(10,20)) # 模拟唱歌 1-5秒
print('%i 走出KTV' %i)
sem.release() # 还钥匙

if __name__ == '__main__':
sem = Semaphore(4) # 实例化4把钥匙
for i in range(20):
p = Process(target=ktv,args=(i,sem))
p.start()

# 限定进程访问的代码 同一时间只能有几个进程来访问
# 开一个们 有4把要是 前4个进程进去后 门就观赏了 直到某一个进程出来还钥匙,第5个进程获取进入
# lock只有一把钥匙
# 信号量有N把钥匙

# 结果:
# 3 走进KTV
# 0 走进KTV
# 8 走进KTV
# 7 走进KTV
# ...同一时间只有4个人
# 8 走出KTV
# 4 走进KTV
# 3 走出KTV
# 12 走进KTV

事件 —— multiprocess.Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 通过一个信号 来控制 多个进程 同时执行或者阻塞
# 事件

from multiprocessing import Event
# 一个信号 可以使所有的进程都进入阻塞状态
# 也可以控制 所有的进程 解除阻塞
# 一个事件 被创建之后,默认是阻塞状态
e = Event() # 创建了一个事件
print(e.is_set() ) # 查看一个事件的状态 # False
e.set() # 将这个时间的状态改为True
print(e.is_set() ) # True

e.wait() # 是依据e.is_set()的值,来决定是否阻塞,如果是False就阻塞,True就是不阻塞
print(123456) # 正常打印

e.clear() # 将事件状态改成false
print(e.is_set()) # False
e.wait() # 阻塞
print('123456') # 不打印

# set 和 clear
# 分别用来修改一个事件的状态:True / False
# is_set
# 用来查看一个事件的状态
# wait
# 依据事件的状态来决定自己是否阻塞 True:不阻塞,False:阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 红绿灯事件
# 两个进程,车怎么才能感知到灯?
import time
import random
from multiprocessing import Event
from multiprocessing import Process

def car(e,i):
if not e.is_set():
print('car%s 等红灯'%i)
e.wait() # 阻塞,直到得到一个事件状态改变,编程True的信号
print('\033[0;32;40mcar%s 通过\033[0m' %i)

def light(e):
while True:
if e.is_set(): # Ture
e.clear() # False
print('\033[31m红灯亮了\033[0m')
else:
e.set() # True
print('\033[32m绿灯亮了\033[0m')
time.sleep(2)

if __name__ == '__main__':
e = Event()
p = Process(target=light,args=(e,))
p.start()

for i in range(20):
cars = Process(target=car,args=(e,i))
cars.start()
time.sleep(random.random()) # 0 - 1秒

进程间的通信 —— 队列和管道

进程间通信:IPC(Inter-Process Communication)

队列 multiprocessing.Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 队列 先进先出
import time
from multiprocessing import Queue
q = Queue(5)
# for i in range(6): # 0 - 5 超过队列大小了 会阻塞
# q.put(i)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
print(q.full()) # True 队列是否满了
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
print(q.get()) # 4
print(q.get()) # 5
print(q.empty()) # True 队列是否空
while True:
try:
q.get_nowait()
except:
print('队列已空')
time.sleep(1)

# 放到满 和 空了再取值 都会阻塞

生产者与消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 队列
# 生产者与消费者模型:
# 买包子

# 蒸包子 # 买包子
# 生产数据 # 消费数据
# 爬虫 爬500个网页 生产数据快 ——> 放到内存里 # 处理爬虫 处理数据 慢

# 1、容器满了就不允许再放
# 2、增加处理进程
# 3、如果爬取数据慢,比如网络延迟或者需要验证破解,那么增加生产者,解决数据供需不平衡

# 生产者 进程
# 消费者 进程
import time
import random
import os
from multiprocessing import Queue
from multiprocessing import Process

# 消费者
def consumer(q,name):
# 数据一直处理
while True:
food = q.get()
if food is None: # 当获取到空了说明生产者完成了所有生产
print('%s 获取到空了' %name)
break

print('\033[31m%s消费了%s\033[0m' % (name,food))
time.sleep(random.randint(1,3))

# 生产者
def producer(name,food,q):
for i in range(5):
time.sleep(random.randint(1,3))
f = '%s生产了%s%s' %(name,food,i)
print(f,os.getpid())
q.put(f) # 放到队列中

if __name__ == '__main__':
q = Queue(20)

pro1 = Process(target=producer,args=('小红','包子',q))
pro2 = Process(target=producer,args=('小兰','汽水',q))
c1 = Process(target=consumer,args=(q,'小黑'))
c2 = Process(target=consumer,args=(q,'小金'))

pro1.start()
pro2.start()
c1.start()
c2.start()

pro1.join()
pro2.join()
q.put(None) # 等生产者都生产完毕后 放入一个空值
q.put(None) # 等生产者都生产完毕后 放入一个空值

# Queue 是进程安全的,在队列里的数据只能被一个进程取走
# 三个不同的进程 有可能在同一个时间去队列取值,这样是不安全的
# None被其中一个进程取走,另外一个并没有拿到,所以会阻塞

JoinableQueue队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import time
import os
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue

# 消费者
def consumer(q,name):
# 数据一直处理
while True:
food = q.get()
if food is None: # 当获取到空了说明生产者完成了所有生产
print('%s 获取到空了' %name)
break

print('\033[31m%s消费了%s\033[0m' % (name,food))
time.sleep(random.randint(1,3))
q.task_done() # count - 1 ,直到队列中的所有数据都执行了task_done


# 生产者
def producer(name,food,q):
for i in range(5):
time.sleep(random.randint(1,3))
f = '%s生产了%s%s' %(name,food,i)
print(f,os.getpid())
q.put(f) # 放到队列中 count + 1 1.,20

q.join() # 阻塞,直到一听歌队列中的所有数据 全部被处理完毕,这个进程才结束

if __name__ == '__main__':
q = JoinableQueue(20)

pro1 = Process(target=producer,args=('小红','包子',q))
pro2 = Process(target=producer,args=('小兰','汽水',q))
c1 = Process(target=consumer,args=(q,'小黑'))
c2 = Process(target=consumer,args=(q,'小金'))

pro1.start()
pro2.start()

c1.daemon = True # 主进程中的代码执行完毕后 该守护进程结束 进程结束了 q.get()也不会阻塞了。
c2.daemon = True
c1.start()
c2.start()

pro1.join() # 感知一个进程的结束
pro2.join() # 生产进程 q.join()结束,需要等待消费者都处理完才能结束

# JoinableQueue 比 Queue多了两个方法
# 1、获取数据要提交回执 q.task_done() q.join()

# c1.daemon = True 守护进程 主进程中的代码执行完毕后 该守护进程结束

# 在消费者这一端:
# 每次获取一个数据
# 处理一个数据
# 发送一个记号:标志一个数据被处理成功

# 在生产者这一端:
# 每一次生产一个数据
# 且每一次生产的数据都放在队列里
# 在队列中刻上一个记号
# 当生产者全部生产完毕之后
# join信号:已经停止生产数据,且要等待之前被刻上的记号都被消费完
# 当数据都被处理完事,join阻塞结束

# consumer 把所有的任务消耗完
# producer 端的join感知到,停止阻塞
# 所有的producer 进程结束
# 主进程中的p.join结束
# 主进程的代码结束
# 守护进程(c1,c2消费者进程)结束

管道 multiprocessing.Pipe

作用:在进程之间通信

1
2
3
4
5
6
7
8
9
from multiprocessing import Pipe,Process
def func(conn):
conn.send('吃了么')

if __name__ == '__main__':
conn1,conn2 = Pipe()
p = Process(target=func,args=(conn1,))
p.start()
print(conn2.recv())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 发送多条消息
from multiprocessing import Pipe,Process
def func(conn):
while True:
msg = conn.recv()
if msg is None: break
print(msg)

if __name__ == '__main__':
conn1,conn2 = Pipe()
p = Process(target=func,args=(conn1,))
p.start()
for i in range(20):
conn2.send('吃了么%s' %i)
conn2.send(None)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 作为两端通信
# conn2发送,conn1接收
def func(conn1,conn2):
conn2.close()
while True:
try:
msg = conn1.recv()
print(msg)
except EOFError: # 没有数据仍然recv的时候报错
conn1.close()
break

if __name__ == '__main__':
conn1,conn2 = Pipe()
p = Process(target=func,args=(conn1,conn2))
p.start()
conn1.close()
for i in range(20):
conn2.send('吃了么%s' %i)
conn2.close() # 主进程两边都关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import time
import random
from multiprocessing import Pipe
from multiprocessing import Process
from multiprocessing import Lock

def producer(con,pro,name,food):
con.close() # 用不到消费者管道
for i in range(6):
time.sleep(random.randint(1,3))
f = '%s生产%s %s' %(name,food,i)
print(f)
pro.send(f) # 生产放入管道
pro.close() # 生产完成后关闭管道

def consumer(con,pro,name,lock):
pro.close()
while True:
try:
lock.acquire()
food = con.recv() # 从管道拿产品
lock.release()
print('%s 购买了 %s' %(name,food))
time.sleep(random.randint(1,3))
except EOFError:
con.close()
break

if __name__ == '__main__':
con,pro = Pipe() # 生产者和消费者的管道
lock = Lock() # 加锁

p = Process(target=producer,args=(con,pro,'rubin','汽水'))
p.start()

c1 = Process(target=consumer,args=(con,pro,'leo',lock))
c1.start()

c2 = Process(target=consumer,args=(con,pro,'lex',lock))
c2.start()

con.close()
pro.close()


# pipe 数据不安全性
# 生产者 消费者1 1个放1个取
# 生产者 消费者1、2 1个放2个取 其他消费也来抢占数据,数据在管道中是混乱的,没被拿走之前,消费者都可以来申请
# 多个消费者同时取一个数据,数据不安全
# 管道是进程数据不安全的,解决方式:加锁

# 队列是进程之间数据安全的,因为队里基于管道+锁 实现的,所以之后会更多使用队列

进程之间的数据共享 multiprocessing.Manager

  • 基于消息传递的并发编程是大势所趋
  • 即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。
  • 这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。
  • 但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
  • 以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。
  • 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
  • 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 进程之间的数据共享
from multiprocessing import Manager,Process,Lock

def func(dic):
dic['count'] -= 1
print(dic)

if __name__ == '__main__':
m = Manager()
print(m) # <multiprocessing.managers.SyncManager object at 0x0000000001D7A3C8>
dic = m.dict({'count':100}) # dic会变成数据共享的字典
p_lst = []
p = Process(target=func, args=(dic,))
p.start()
p.join()
print('主进程:',dic)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def func(dic,lock):
lock.acquire()
dic['count'] -= 1
lock.release()
# print(dic)

if __name__ == '__main__':
m = Manager()
lock = Lock() # 不加锁而操作共享的数据,肯定会出现数据错乱
# print(m) # <multiprocessing.managers.SyncManager object at 0x0000000001D7A3C8>
dic = m.dict({'count':100}) # dic会变成数据共享的字典
p_lst = []
for i in range(50):
p = Process(target=func, args=(dic,lock))
p.start()
p_lst.append(p)
[ i.join() for i in p_lst ] # 等待所以子进程都结束
print('主进程:',dic)

# 总结
# 进程同步控制:锁、信号量、事件 -- 控制进程怎么执行,能不能一起执行,几个一起执行,什么时候一起执行 -- 控制
# 进程间通信:队列和管道 -- 通信
# 进程间数据共享: Manager -- 共享
# 以后真正会用到的只有,进程控制,通信方面只用队列,
# 未来使用 -- kafak,rabbitmq memcache (消息中间件) kafak(大数据消息中间件,会保留数据)
# 进程服务器(多台) --> 服务器(memcache)

进程池 Pool

multiprocessing.Pool

为什么要有进程池?进程池的概念。

  • 创建进程需要消耗时间,销毁进程也需要消耗时间。不能无限制的根据任务开启或者结束进程。
  • 进程池的概念:定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。

Pool([numprocess [,initializer [, initargs]]]):创建进程池

1
2
3
4
# 参数介绍:
# 1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
# 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
# 3 initargs:是要传给initializer的参数组

map()方法 进程池和进程效率测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 为什么会有进程池的概念
# 提高效率
# 1、每次开启进程,都需要创建一个新的属于这个进程的内存空间,耗时
# 寄存器 堆栈 都是存储代码和变量的
# 2、进程过多,造成操作系统调度,切换过程较多
# 进程不能无限制的开放,而是需要进程池

# 进程池
# python:在还没有启动程序之前,先创建一个属于进程的池子
# 这个池子指定能存放多少个进程
# 先将这些进程创建好
# 有50个任务,池子里有5个进程,任务需要排队,按顺序先执行5个任务,结束后不消失回到进程池里接收新的任务,后面依次执行
# 现象:同一时间操作系统中,只执行了这5个进程,减少了进程开销,使5个进程的内存空间循环被利用

# 信号量,同一时间N个进程执行 ,有点像进程池,区别是信号量多个进程排队,在等着执行一段代码,实际上信号量有N个进程被创建了。
# 一个是进程排队,一个是任务排队。

# 进程池既减少了操作系统的调度,且减少了进程开销。

# 高级进程池(弹性伸缩)
# python中没有
# n,m 上限和下限
# 3 三个进程
# 用户量增多,+1进程,一直加到上线m 20个,最多到20个
# 当任务不断减少的时候,再减到3个进程
# 好处:有效的介绍操作系统负担,减少进程

# 开启进程的个数
# CPU核数 + 1 = 进程开启个数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 使用进程池
import time
from multiprocessing import Pool,Process
# Process 超过5个进程,需要使用进程池

def func(n):
for i in range(10):
print(n + 1)

def func2(n):
print(n)
# ('leo', 1)
# rubin

if __name__ == '__main__':
start = time.time()
pool = Pool(5) # 5个进程
# pool.map(func,range(100)) # 100个任务 (方法名,可迭代类型)map方法自带join()
pool.map(func2,[('leo',1),'rubin']) # 第二个任务
t1 = time.time() - start

# 启100个进程
start = time.time()
p_lst = []
for i in range(100):
p = Process(target=func,args=(i,))
p_lst.append(p)
p.start()

for i in p_lst:i.join()
t2 = time.time() - start

print(t1,t2) # 0.21701264381408691 3.6672096252441406 开启100个进程 并没有5个进程交替执行的快

# 进程池提高了执行效率

进程池中的同步和异步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 进程池的同步调用
import time
import random
import os
from multiprocessing import Pool

def func(n):
print('start func %s' %n,os.getpid())
time.sleep(1)
print('end func %s' %n,os.getpid())

if __name__ == '__main__':
pool = Pool(5)
for i in range(10):
# pool.apply(func,args=(i,)) # apply同步提交任务 (方法,参数)
pool.apply_async(func,args=(i,)) # apply_async 异步提交任务 async在python就代表着异步
# 真异步,主进程执行完了,不等待子进程
pool.close() # 结束进程池接收任务
pool.join() # 感知进程池中的任务执行结束

# start func 0 8572
# start func 1 6000
# start func 2 1208
# start func 3 10492
# start func 4 9232

# end func 0 8572
# start func 5 8572
# ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 使用进程池创建socket_server
# server
import socket
from multiprocessing import Pool

def func(conn):
conn.send(b'hello')
print(conn.recv(1024).decode('utf-8'))
conn.close()


if __name__ == '__main__':
pool = Pool(5)
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn,addr = sk.accept()
pool.apply_async(func,args=(conn,))
sk.close()

# client
import socket

sk = socket.socket()
sk.connect(('127.0.0.1',8080))

ret =sk.recv(1024).decode('utf-8')
print(ret)
msg = input('>>>').encode('utf-8')
sk.send(msg)

sk.close()

进程池的返回值

1
2
3
4
5
6
7
8
import requests
import time
from multiprocessing import Pool

# p = Pool()
# p.map(funcname,iterable) 默认异步的执行任务,且自带close和join
# p.apply 同步调用
# p.apply_async 异步调用 和 主进程 完全异步,主进程结束不会等待子进程,需要手动close和join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 进程池的返回值
# 进程池特有的
# 使用队列实现
def func(i):
time.sleep(0.5)
return i*i

if __name__ == '__main__':
p = Pool(5)
res_l = []
for i in range(10):
# res = p.apply(func,args=(i,)) # apply的结果就是func的返回值
# print(res)

res = p.apply_async(func,args=(i,)) # 异步提交
# print(res.get()) # res进程的对象 get会阻塞等待结果,等着func的结算结果
res_l.append(res)

for res in res_l:print(res.get()) # 一次获取5个结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# map
def func(i):
time.sleep(0.5)
return i*i

if __name__ == '__main__':
p = Pool(5)
ret = p.map(func,range(10)) # map自带join和close
print(ret) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 任务都运行完毕后 返回一个列表

# map 和 apply,apply_async的区别:
# apply_async : 5个5个打印
# apply : 一次返回
# map : 任务计算完,返回一个列表
# 任务很多 使用 apply_async更好 不用等都执行完,拿到结果更快

回调函数 callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
from multiprocessing import Pool

def func1(n):
print('in func1',os.getpid())
return n*n

def func2(nn):
print('in func2',os.getpid())
print(nn)

if __name__ == '__main__':
p = Pool(5)
print('主进程pid: ',os.getpid())
for i in range(10):
p.apply_async(func1,args=(10,),callback=func2)
p.close()
p.join()

# in func1
# in func2
# 100

# 1、执行func1 他的返回值 作为回调函数的参数
# 2、执行回调函数 func2
# 3、回调函数不传参数,他的参数只能是func1的返回值
# 4、回调函数在主进程中执行

回调函数 – 爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import requests
from multiprocessing import Pool
# response = requests.get('https://maoyan.com/board/4')
# print(response) # 网页结果对象
# # print(response.__dict__)
# print(response.status_code) # 200
# print(response.text)

# 爬虫
# 耗时最长 网络延迟
# 1、访问网址
# 2、将数据从网址上下载下来 urllib * 耗时最长 发送请求,拿到代码,接收代码
# 3、数据就是bytes 转成 字符串
# 4、处理字符串

# 5个进程,任务是200个(访问200个)
# 同时跑5个进程,一起享受网络延迟,如果在这个时候处理字符串,那么195个进程都在排队
# 如果处理交给主进程来做,这5个进程的处理字符串,那么可以省出5个进程继续下载网页
# - 下载网页1
# - 下载网页2
# - 下载网页3
# - 下载网页4
# - 下载网页5
# ----- 处理字符串

# 一般情况下,爬虫的时候,容易用到回调函数
# 访问网页,爬取网页的过程用爬虫
# 处理数据,使用回调函数

# 流程:
# 多进程去访问页面,拿到结果,返回url和页面内容
# 回调函数打印url和网页内容长度
# 使用进程池下载页面

# 子进程处理下载页面
def get_page(url):
res = requests.get(url)
if res.status_code == 200 :
return url,res.text

# 回调函数,接收网页内容
# 打印页面内容长度
def call_back(args):
url,content = args
print(url,len(content))

if __name__ == '__main__':
url_lst = [
'https://www.baidu.com',
'https://www.sogou.com',
'http://www.sohu.com/',
'https://maoyan.com/board/4',
]

p = Pool(5)
for url in url_lst:
p.apply_async(get_page,args=(url,),callback=call_back)

p.close()
p.join()

# https://www.sogou.com 23447
# https://www.baidu.com 2443
# http://www.sohu.com/ 180835
# https://maoyan.com/board/4 20754