线程


线程

进程内容回顾:

  • 程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。
  • 程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
  • 在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。
  • 进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。
  • 进程的出现,可以让一台服务器同时处理多个任务,在多个任务之间来回切换并记录任务执行状态。

进程的缺陷:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

线程的出现:
随着计算机技术的发展,进程出现了很多弊端,一是由于进程是资源拥有者,创建、撤消与切换存在较大的时空开销,因此需要引入轻型进程;二是由于对称多处理机(SMP)出现,可以满足多个运行单位,而多个进程并行开销过大。因此在80年代,出现了能独立运行的基本单位——线程(Threads)。

注意:

  • 进程是资源分配的最小单位,线程是CPU调度的最小单位.
  • 每一个进程中至少有一个线程。 

进程和线程的关系

  1. 地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
  2. 通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
  3. 调度和切换:线程上下文切换比进程上下文切换要快得多。
  4. 在多线程操作系统中,进程不是一个可执行的实体。

线程的特点

在多线程的操作系统中,通常是在一个进程中包括多个线程,每个线程都是作为利用CPU的基本单位,是花费最小开销的实体。线程具有以下属性。

  1. 轻型实体。存储的较少
    线程中的实体基本上不拥有系统资源,只是有一点必不可少的、能保证独立运行的资源。
    线程的实体包括程序、数据和TCB。线程是动态概念,它的动态特性由线程控制块TCB(Thread Control Block)描述。
  2. 独立调度和分派的基本单位。 真正被操作系统调度的是线程
    在多线程OS中,线程是能独立运行的基本单位,因而也是独立调度和分派的基本单位。由于线程很“轻”,故线程的切换非常迅速且开销小(在同一进程中的)。
  3. 共享进程资源。 进程的数据在多线程中使用不用ipc,而是直接使用
    线程在同一进程中的各个线程,都可以共享该进程所拥有的资源,这首先表现在:所有线程都具有相同的进程id,这意味着,线程可以访问该进程的每一个内存资源;此外,还可以访问进程所拥有的已打开文件、定时器、信号量机构等。由于同一个进程内的线程共享内存和文件,所以线程之间互相通信不必调用内核。
  4. 可并发执行。 比如6个线程可以运行不同的代码
    在一个进程中的多个线程之间,可以并发执行,甚至允许在一个进程中所有线程都能并发执行;同样,不同进程中的线程也能并发执行,充分利用和发挥了处理机与外围设备并行工作的能力。

内存中的线程

  • 多个线程共享同一个进程的地址空间中的资源,是对一台计算机上多个进程的模拟,有时也称线程为轻量级的进程。
    而对一台计算机上多个进程,则共享物理内存、磁盘、打印机等其他物理资源。多线程的运行也多进程的运行类似,是cpu在多个线程之间的快速切换。

  • 不同的进程之间是充满敌意的,彼此是抢占、竞争cpu的关系,如果迅雷会和QQ抢资源。而同一个进程是由一个程序员的程序创建,所以同一进程内的线程是合作关系,一个线程可以访问另外一个线程的内存地址,大家都是共享的,一个线程干死了另外一个线程的内存,那纯属程序员脑子有问题。

  • 类似于进程,每个线程也有自己的堆栈,不同于进程,线程库无法利用时钟中断强制线程让出CPU,可以调用thread_yield运行线程自动放弃cpu,让另外一个线程运行。

  • 线程通常是有益的,但是带来了不小程序设计难度,线程的问题是:

  1. 父进程有多个线程,那么开启的子线程是否需要同样多的线程
  2. 在同一个进程中,如果一个线程关闭了文件,而另外一个线程正准备往该文件内写内容呢?
    因此,在多线程的代码中,需要更多的心思来设计程序的逻辑、保护程序的数据。

开启多线程

通过threading模块开启多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Python多线程模块
# thread、threading和Queue 推荐使用threading
# 线程的创建Threading.Thread类

import time
from threading import Thread

# 多线程并发
def func(n):
time.sleep(1) # 10次子线程一起执行
print(n) # 并发

for i in range(10):
t = Thread(target=func,args=(i,))
t.start()

通过继承threading类 开启多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
from threading import Thread

class MyTread(Thread):
def __init__(self,arg):
super().__init__()
self.arg = arg
def run(self):
time.sleep(1)
print(self.arg)

for i in range(10):
t = MyTread(10)
t.start()

多线程与多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
import time
from threading import Thread
def func(a,b):
n = a+b
print(n,os.getpid())

print('主线程',os.getpid())
for i in range(10):
t = Thread(target=func,args=(i,5))
t.start()

# 主线程和子线程的pid是一致的,说明子线程都是在一个进程里执行
# 主进程中 存储 导入的模块,文件所在的位置,内置的函数,代码
# 主线程存储循环的i 和 t
# 子线程拥有栈,里面存储着 add a 和 b n = 0+5

内存数据共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 内存数据共享
import os
import time
from threading import Thread
def func(a,b):
global g
g = 0
# g = 0 + a
print(g,os.getpid())
g = 100
t_lst = []
print('主线程',os.getpid())
for i in range(10):
t = Thread(target=func,args=(i,5))
t.start()
t_lst.append(t)

for t in t_lst:t.join()
print(g)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 总结:
# 进程 是 最小的内存分配单位
# 线程 是 操作系统调度的最小单位
# 真正被执行的是线程,线程被CPU执行
# 进程内至少含有一个线程
# 进程中可以开启多个线程
# 开启一个线程 所需要的时间 要远远小于开启一个进程
# 线程占用的内存空间 也更小
# 多个线程内部有自己的数据栈,这个数据是不共享的
# 全局变量在多个线程之间是共享的

# 在CPython解释器下的Python程序
# 在同一时刻,多个线程中只能有一个线程 被 CPU执行
# 高CPU:计算类 --- 高CPU利用率 执行效率不高
# 高IO:
# 爬取网页
# 读写文件 处理日志文件
# 网络请求 处理web请求
# 读数据库 写数据库
# 非要处理计算类,就使用多进程 进程与进程没有锁

# 4个CPU
# 不同的CPU可以执行多条线程,可能会造成数据的混乱
# 对同一个数据的加减操作
# CPython解释器 为了避免 存在 一种锁 GLP 全局解释器锁
# 任何一个线程 想要给CPU 必须拿到钥匙,只有拿到钥匙的线程才能交给CPU执行
# 重点:
# 1 同一时刻,只能有一个线程,访问CPU,
# 2 锁的是什么?锁的是线程
# 3 锁的缺点: 多线程不能充分的利用CPU,同一时间只能用到一个C
# 4 GLP是Python语言的问题么? 不是,是CPython解释器的特性,JPython没有这个锁

多线程与多进程执行效率对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
from threading import Thread
from multiprocessing import Process


def func(n):
n + 1 # 计算操作 占用cpu

if __name__ == '__main__':
start = time.time()
t_lst = []
for i in range(100):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:t.join()
t1 = time.time() - start

start = time.time()
p_lst = []
for i in range(100):
p = Process(target=func,args=(i,))
p.start()
p_lst.append(t)
for p in p_lst:p.join()
t2 = time.time() - start

print('线程执行',t1) # 0.009000539779663086
print('进程执行',t2) # 1.8071033954620361
# 对于高IO 和 简单的高CPU计算任务时 , 多线程的开销更少,执行效率更高

线程模块中的其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import time

def wahaha(n):
time.sleep(0.5)
print(n,threading.current_thread(),threading.get_ident()) # 子线程 名字 PID


for i in range(10):
threading.Thread(target=wahaha,args=(1,)).start()

print(threading.active_count()) # 查看当前所有的活跃的线程数 # 11 (10个子线程+1个主线程)
print(threading.current_thread()) # 主线程 名字 PID
print(threading.enumerate()) # 所有的进程名字 pid 存在一个列表返回
print(len(threading.enumerate())) # 11

多线程实现简单的socket服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# server
import socket
from threading import Thread

def chat(conn):
conn.send(b'hello')
msg = conn.recv(1024).decode('utf-8')
print(msg)
conn.close()


s = socket.socket()
s.bind(('127.0.0.1',8090))
s.listen()

while True:
conn,addr = s.accept() # 获取连接
t = Thread(target=chat,args=(conn,))
t.start()

s.close()
1
2
3
4
5
6
7
8
9
10
11
# client
import socket

s = socket.socket()
s.connect(('127.0.0.1',8090))

msg = s.recv(1024).decode('utf-8')
print(msg)
inp = input('>>>').encode('utf-8')
s.send(inp)
s.close()

守护线程 daemon=True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
from threading import Thread

def func1():
while True:
print(10 * '*')
time.sleep(1)

def func2():
print('in func2')
time.sleep(5)

t = Thread(target=func1,)
t.daemon = True
t.start()

t2 = Thread(target=func2,)
t2.start()
t2.join() # 所有的t2结束后打印在执行主进程代码

# 守护进程 随着主进程代码的执行结束 而结束
# 守护线程 会在主线程结束之后,等待其他子线程的结束而结束

# 主进程 在执行完自己的代码之后 不会立即结束,而是等待子进程结束之后 回收子进程的资源

import time
from multiprocessing import Process

def func1():
while True:
print(10 * '*')
time.sleep(1)

def func2():
print('in func2')
time.sleep(5)

if __name__ == '__main__':
p = Process(target=func1, )
p.daemon = True # 守护线程 主线程结束,守护线程随之结束
p.start()

p2 = Process(target=func2, )
p2.start()
print('主进程')

线程锁 Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import time
from threading import Lock,Thread

def func(lock):
global n
lock.acquire()
temp = n
time.sleep(0.2)
n = temp - 1
lock.release()

n = 10
t_lst = []
lock = Lock()
for i in range(10):
t = Thread(target=func,args=(lock,))
t.start()
t_lst.append(t)

for i in t_lst:t.join() # 执行完所有子线程
print(n)

# 10个线程分别 n - 1
# 1、10个线程同时执行 都去拿 n = 10
# 2、都sleep 0.2秒,同时减1
# 3、同时赋值回去 大家赋值的都是9
# 4、为什么有了GIL锁 还是会出现混乱 : 拿到数据后 刚好时间片到了会释放锁 ,就会拿到同一数据
# 5、GIL锁加给线程,为了避免多个线程同一时间对一个数据操作,但是无法避免时间片的轮转对数据的不安全性
# 6、多线程里还需要用到锁
# 7、加锁,牺牲了效率,保证了数据安全

递归锁解决死锁问题 RLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import time
from threading import Lock,Thread
noodle_lock = Lock()
fork_lock = Lock()

def eat1(name):

noodle_lock.acquire()
print('%s 拿到面条' %name)
fork_lock.acquire()
print('%s 拿到叉子' %name)
print('%s 吃面' %name)

fork_lock.release()
noodle_lock.release()

def eat2(name):

fork_lock.acquire()
print('%s 拿到叉子' %name)
time.sleep(1)
noodle_lock.acquire()
print('%s 拿到面条' %name)
print('%s 吃面' %name)

noodle_lock.release()
fork_lock.release()


Thread(target=eat1,args=('rubin',)).start()
Thread(target=eat2,args=('leo',)).start()
Thread(target=eat1,args=('lex',)).start()
Thread(target=eat2,args=('fily',)).start()


# rubin 拿到面条
# rubin 拿到叉子
# rubin 吃面
# leo 拿到叉子
# lex 拿到面条
# ... 卡住了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 递归锁 解决死锁问题

import time
from threading import Lock,Thread
from threading import RLock
r_fork_lock = r_noodle_lock = RLock() # 一个钥匙串上的两把钥匙

def eat1(name):

r_noodle_lock.acquire() # 一旦拿到其中一把钥匙,说明要是都在手里 房子1
print('%s 拿到面条' %name)
r_fork_lock.acquire() # 房子2 2层钥匙
print('%s 拿到叉子' %name)
print('%s 吃面' %name)

r_fork_lock.release() # 释放1层
r_noodle_lock.release() # 释放2层 还完所有钥匙

def eat2(name):

r_fork_lock.acquire()
print('%s 拿到叉子' %name)
time.sleep(1)
r_noodle_lock.acquire()
print('%s 拿到面条' %name)
print('%s 吃面' %name)

r_noodle_lock.release()
r_fork_lock.release()


Thread(target=eat1,args=('rubin',)).start()
Thread(target=eat2,args=('leo',)).start()
Thread(target=eat1,args=('lex',)).start()
Thread(target=eat2,args=('fily',)).start()

# Lock 互斥锁 1把钥匙
# RLock 递归锁 同一个线程拿多少次钥匙都可以 , 不同的线程里 一旦有一个拿到了,别的线程都无法拿到
# 递归解决死锁问题,同一个线程可以被 acquire 多次。

# 当同一个线程 或者 同一个进程 遇到两把以上锁的时候,就容易出现死锁,需要使用递归锁。
# 线程进场遇到全局数据,全局数据又是共享,所以经常要加锁。

信号量 Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
from threading import Semaphore,Thread
# 信号量 就是控制n个线程 访问同一段代码

def func(sem,a,b):

sem.acquire()
time.sleep(1)
print(a + b)
sem.release()

# KTV例子 同一时间几个人能够进入
sem = Semaphore(4) # 同一时间 只能有4个线程进入
for i in range(10):
t = Thread(target=func,args=(sem,i,i+5))
t.start()

事件 Event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 事件被创建的时候
# 当False状态,wait() 阻塞
# 当True状态的 wait() 非阻塞
# clear 设置状态为False
# set 设置状态为True
# 之前有红绿灯例子

# 链接数据库和检测数据库的可链接情况
# 起两个线程
# 第一个线程 : 链接数据库
# 等待一个信号,告诉我们之间的网络是通的
# 链接数据库
# 第二个线程 : 检测与数据库之间的网络是否连通
# 模拟检测 time.sleep(0,2) 如果2秒钟没通 就是没通
# 将事件的状态,设置为 True

import time
import random
from threading import Event,Thread

def connect_db(e):
count = 0 # 计数器,尝试三次链接
while count < 3:
e.wait(0.5) # 状态为False的时候 ,只等待n秒钟就结束
if e.is_set() == True:
print('链接数据库')
break
else:
count += 1
print('第%s次链接失败' %count)
else: # while 循环 + else 三次都没有被break 也就是都没连接上
raise TimeoutError('数据库连接超时') # 超时错误

def check_web(e):
time.sleep(random.randint(0,3))
e.set() # wait = True

e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()

定时器 Timer

1
2
3
4
5
6
7
8
9
import time
from threading import Timer

def func():
print('时间同步')

while True:
t = Timer(2,func).start() # Timer(秒数,方法) 2秒钟之后开启线程
time.sleep(5) # 和上面的Timer是同步 Timer5秒后会执行func

队列 queue

1
2
3
4
# Queue             先进先出 队列
# LifoQueue 先进后出 栈
# PriorityQueue 优先级队列
# 他们都是数据安全
1
2
3
4
5
6
7
# 队列
import queue
q = queue.Queue() # 队列的特点 先进先出
q.put() # 满了会阻塞
q.get() # 空了会阻塞
q.put_nowait() # + 异常处理
q.get_nowait() #
1
2
3
4
5
6
7
# 栈
import queue
q = queue.LifoQueue() # 栈 先进后出 (先进来的在最底下,后进来的先出去)
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
1
2
3
4
5
6
7
8
9
10
# 优先级队列
import queue
q = queue.PriorityQueue() # 优先级队列 放数据的时候,给优先级
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
q.put((1,'z'))
q.put((1,'d'))

print(q.get()) # (1, 'd') 顺序按照 数字从小到大,如果数字相同,按照数据的ASCII码排

线程池 concurrent.futures 模块

1
2
3
4
5
# 1 介绍
# concurrent.futures模块提供了高度封装的异步调用接口
# ThreadPoolExecutor:线程池,提供异步调用
# ProcessPoolExecutor: 进程池,提供异步调用
# Both implement the same interface, which is defined by the abstract Executor class.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 2 基本方法
# submit(fn, *args, **kwargs)
# 异步提交任务

# map(func, *iterables, timeout=None, chunksize=1)
# 取代for循环submit的操作

# shutdown(wait=True)
# 相当于进程池的pool.close() + pool.join() 操作
# wait=True,等待池内所有任务执行完毕回收完资源后才继续
# wait=False,立即返回,并不会等待池内的任务执行完毕
# 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
# submit和map必须在shutdown之前

# result(timeout=None)
# 取得结果

# add_done_callback(fn)
# 回调函数

# done()
# 判断某一个线程是否完成

# cancle()
# 取消某个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 线程池
import time
from concurrent.futures import ThreadPoolExecutor # 线程池,提供异步调用
from concurrent.futures import ProcessPoolExecutor # 进程池,提供异步调用 apply_async()

def func(n):
time.sleep(2)
print(n)
return n*n

tpool = ThreadPoolExecutor(max_workers=5) # 1一般情况下 线程卡其 max_workers = 默认不要超过 cpu个数 * 5
t_lst = [] # 任务列表
for i in range(10):
t = tpool.submit(func,i) # 线程池提交任务
t_lst.append(t) # 存储结果对象,就像领了个号,等任务结束后去领取结果
tpool.shutdown() # pool.close() 关闭池子不让人物再提交进来 + pool.join() 阻塞直到池子中的子线程任务执行完
print('主线程')

for t in t_lst:print('***',t.result()) # 为什么这个地方按顺序打印,因为 t_lst 是按照顺序产生的
1
2
3
4
5
6
7
8
9
10
11
12
# 流程总结:
# 1、创建线程池
# 2、线程池异步提交任务
# 3、阻塞到线程池任务都结束
# 4、获取结果

# 如果没有shutdown:
# 1、会先打印 print('主线程')
# 2、谁先执行好 就先接收到结果输出 (消耗好一些)
# 5线程 * 每个任务2秒 = 5任务2秒 10任务4秒 如果join 需要4秒后统一拿结果
# 不加join 2秒后执行完前5个任务拿到结果
# 如果有返回值的话 ,超过池子的数量 用 shutdown 效率会慢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
### 进程池
import time
from concurrent.futures import ThreadPoolExecutor # 线程池,提供异步调用
from concurrent.futures import ProcessPoolExecutor # 进程池,提供异步调用 apply_async()

def func(n):
time.sleep(2)
print(n)
return n*n

if __name__ == '__main__':

p_pool = ProcessPoolExecutor(max_workers=5)
p_lst = [] # 任务列表
for i in range(10):
t = p_pool.submit(func, i)
p_lst.append(t)
p_pool.shutdown()
print('主进程')

for p in p_lst: print('进程池 ***', p.result())
1
2
3
4
5
6
7
8
9
10
11
### map
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
time.sleep(2)
print(n)
return n*n

t_pool = ThreadPoolExecutor(max_workers=5)
t_pool.map(func,range(20)) # 拿不到返回值啦
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 回调函数 call_back
import time
from concurrent.futures import ThreadPoolExecutor # 线程池,提供异步调用
from concurrent.futures import ProcessPoolExecutor # 进程池,提供异步调用 apply_async()

def func(n):
time.sleep(2)
print(n)
return n*n

def call_back(m): # 接收到了个对象
print('结果是%s' %m.result())

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = [] # 任务列表
for i in range(10):
t = tpool.submit(func,i).add_done_callback(call_back) # 线程池提交任务 带回调函数
t_lst.append(t)