官网文档:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)文档剖析:
在Cpython中GIL全局解释器锁其实也是一把互斥锁,主要用于阻止同一个进程下的多个线程同时被运行(通俗理解:python的多线程无法使用多核优势);
GIL肯定存在于CPython解释器中 主要原因就在于Cpython解释器的内存管理不是线程安全的;
内存管理:垃圾回收机制
- 引用计数
- 标记清除
- 分代回收
Cpython解释器自带的GIL解释器锁,线程要想执行代码去抢锁,抢python解释器,之后才回收,那么这样就能保证了阻止同一个进程下的多个线程同时被运行,不容易造成数据错乱;比如,抢票,如果你提交了订单,那么别人还能操作到你这张票的订单吗?不会了吧;这样就进而使数据不容易错乱;
name='hz'
,还没有来得急绑定关系,垃圾回收机制就可能给你回收了,因为垃圾回收也是线程,想要执行也得拿解释器来执行,但是不是和你的代码串行;验证之前需要明白什么是
多道技术
(切换+保存状态)多道技术什么时候切换:程序执行时间长、程序有IO操作(√,示例利用IO)
from threading import Threadimport timem = 100def test(): global m tmp = m # time.sleep(1) # IO操作,造成数据错乱,sleep(1)运行释放了GIL锁,100个线程同样的操作反复执行,导致结果为99,如果没有IO操作结果为0 tmp -= 1 m = tmpfor i in range(100): t = Thread(target=test) t.start()time.sleep(3)print(m) # 0'''同一个进程下的多个线程虽然有GIL的存在不会出现并行的效果,但是如果线程内有IO操作还是会造成数据的错乱,这个时候需要我们额外的添加互斥锁(就不止GIL一把锁了)'''
补:抢锁释放锁简写方式
a = Lock()# 方式一:a.acquire()'''代码体'''a.release()# 方式二:with a: '''代码体'''# 适用with上下文管理器,会自动抢锁释放锁
from threading import Thread,Lockimport timemutex = Lock()m = 100def test(): global m with mutex: tmp = m time.sleep(0.1) # IO操作,造成数据错乱,sleep(1)运行释放了GIL锁,100个线程同样的操作反复执行,导致结果为99,如果没有IO操作结果为0 tmp -= 1 m = tmpif __name__ == '__main__': t_list = [] for i in range(100): t = Thread(target=test) t.start() t_list.append(t) for t in t_list: t.join()print(m) # 0
存在多把锁的情况,会出现死锁现象;
from threading import Thread, Lockimport timeA = Lock()B = Lock()class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): A.acquire() # 抢锁 print('%s 抢到了A锁' % self.name) # 相当于获取线程名称 # current_thread().name 获取线程名称 B.acquire() print('%s 抢到了B锁' % self.name) time.sleep(1) B.release() # 释放锁 print('%s 释放了B锁' % self.name) A.release() print('%s 释放了A锁' % self.name) def func2(self): B.acquire() print('%s 抢到了B锁' % self.name) A.acquire() print('%s 抢到了A锁' % self.name) A.release() print('%s 释放了A锁' % self.name) B.release() print('%s 释放了B锁' % self.name)for i in range(10): obj = MyThread() obj.start()
1、线程1抢A锁,B锁,其他线程等待;
2、线程1释放了B锁,其他线程等待,因为A锁没有释放;线程1释放了A锁,其他线程才能去func1中抢锁;
3、线程1去func2中抢B锁,A锁,其他线程抢func1中的A锁和B锁,现在锁还在线程1手中,那么就可能导致卡死现象;
4、通俗理解,这样就导致了,你要的在我手上,我要的在你手上,比如A和B,A在B家被锁了,B在A家被锁了,A和B拿的自己家的钥匙,但是他们在对方的家中,那么就死锁了;
递归锁特点:可以被连续的acquire和release,但是只能被第一个抢到这把锁执行上述操作,它的内部有一个计数器,每acquire一次计数加一,每release一次计数减一,只要计数不为0,其他人都无法抢锁;
模块:RLock
Factory function that returns a new reentrant lock. A reentrant lock must be released by the thread that acquired it. Once a thread has acquired a reentrant lock, the same thread may acquire it again without blocking; the thread must release it once for each time it has acquired it.
from threading import Thread,Lock,RLockimport time'''mutexA = Lock()mutexB = Lock()相当于开启了两把锁;'''# 两把锁指向同一个内存空间地址,开启一把锁A = B = RLock()class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): A.acquire() # 抢锁 print('%s 抢到了A锁' % self.name) # 相当于获取线程名称 # current_thread().name 获取线程名称 B.acquire() print('%s 抢到了B锁' % self.name) time.sleep(1) B.release() # 释放锁 print('%s 释放了B锁' % self.name) A.release() print('%s 释放了A锁' % self.name) def func2(self): B.acquire() print('%s 抢到了B锁' % self.name) A.acquire() print('%s 抢到了A锁' % self.name) A.release() print('%s 释放了A锁' % self.name) B.release() print('%s 释放了B锁' % self.name)if __name__ == '__main__': for i in range(10): obj = MyThread() obj.start()# 递归锁的适用不会死锁,抢一次锁计数加1,释放计数减1,其实就是一把锁没有导致混乱
信号量在不同的阶段可能对应不同的技术点;
在并发编程中信号量指的也是锁;
通俗理解:
互斥锁是一个厕所,那么信号量就是多个厕所
from threading import Thread,Semaphoreimport timeimport randomsm = Semaphore(5) # 括号内写几就代表几个坑位def task(name): sm.acquire() # 抢锁 print(f'{name} is running!') # time.sleep(3) time.sleep(random.randint(1,5)) sm.release() # 释放锁if __name__ == '__main__': for i in range(20): t = Thread(target=task,args=(f'拉屎{i}号',)) t.start()# 信号量可以理解为拉屎的坑位,三个人抢锁(进入厕所),拉完了就是释放锁了;
如果面试官问你python多线程是不是没用啊?你是不是得分情况回答他,不能直接说有用啊,存在即合理···??
视情况而定来判断是否需要多线程,看程序的类型;
比如有四个任务,每个任务耗时10s;
from multiprocessing import Processfrom threading import Threadimport threadingimport os,timedef work(): time.sleep(2)if __name__ == '__main__': l=[] print(os.cpu_count()) #本机为8核 start=time.time() for i in range(400): # 开进程 # p=Process(target=work) # run time is 10.905012845993042大部分时间耗费在创建进程上 # 开线程 p=Thread(target=work) # run time is 2.061677932739258 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start)) # IO密集型的时候,同样的任务多线程只需要2秒
计算密集型(占着cpu不放),比如有四个任务,每个任务耗时10s;
from multiprocessing import Processfrom threading import Threadimport os,timedef work(): res=0 for i in range(100000000): res*=iif __name__ == '__main__': l=[] print(os.cpu_count()) # 本机为8核 start=time.time() for i in range(6): p=Process(target=work) # 多进程耗时:run time is 6.857659339904785 # p=Thread(target=work) # 多线程耗时:run time is 24.021770000457764 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start)) # 计算密集型,多进程只需6s,这样只需看一个cpu计算任务所需的时间,那么多个cpu同时结束;# 多线程就得排队来了,执行完一个继续下一个····
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行;
通俗理解:比如汽车生成车间,肯定得先冲压零部件,然后焊接这些部件(不能扯一体式冲压昂??),然后涂装工艺,最后总装···
那么把这些一个个流程比作线程或进程,是不是得前面的进行完才能往下走?
案例:
from threading import Thread,Eventimport timeevent = Event() # 造红绿灯def light(): print('红灯停') time.sleep(3) print('绿灯行') # 行人走 event.set()def people(name): print(f'{name} 等红绿灯') event.wait() print(f'{name} 可以走了')if __name__ == '__main__': t = Thread(target=light) t.start() for i in range(20): t = Thread(target=people,args=(f'{i}',)) t.start()
同一个进程下的多个线程数据是共享的,为什么同一个进程下还会去使用队列呢?因为队列是管道和锁构成的,使用队列也是为了保证数据得到安全(适用场景自定义)
import queue# 先进先出的队列# q = queue.Queue(3)# q.put(1)# q.get()# q.get_nowait()# q.full()# q.empty()# 后进先出队列,其实和堆栈大差不差# q = queue.LifoQueue(3)# q.put(1)# q.put(2)# q.put(3)# print(q.get()) # 3# 优先级队列:可以给放入队列中的数据设置进出的优先级q = queue.PriorityQueue(4)q.put((10,'111'))q.put((100,'222'))q.put((0,'333'))q.put((-5,'444'))print(q.get()) # (-5, '444')# 数字越小,优先级越高,put((优先级,数据))
TCP服务端并发
# 面条版主体import socketserver = socket.socket()server.bind(('127.0.0.1',8080))server.listen(5)while True: sock,addr = server.accept() # 通信循环 while True: try: data = sock.recv(1024) if len(data) == 0:break sock.send(data.upper()) # 发送大写 except ConnectionResetError as e: print(e) break sock.close()
封装版
from threading import Threadimport socket# 通信函数def communication(sock): # 通信循环 while True: try: data = sock.recv(1024) if len(data) == 0: break sock.send(data.upper()) # 发送大写 except ConnectionResetError as e: print(e) break sock.close()# 连接客户端函数def server(ip,port): server = socket.socket() server.bind((ip,port)) server.listen(5) while True: sock,addr = server.accept() # 开设多进程/多线程 t = Thread(target=communication,args=(sock,)) t.start()if __name__ == '__main__': s = Thread(target=server,args=('127.0.0.1',8080)) s.start()
思考:能否无限制的开设进程或者线程???
肯定是不能无限制开设的,如果单从技术层面上来说无限开设肯定是可以的并且是最高效,但是从硬件层面上来说是无法实现的(硬件的发展永远赶不上软件的发展速度)
这时候就出现了池,我们在合理适用计算机的时候,保证硬件正常工作的前提,去开设多进程和多线程,是最合理的,如果硬件崩溃了,软件也没用了;
池:池是用来保证计算机安全情况下,最大限度的利用计算机,它的出现降低了程序的运行效率但是保证了计算机硬件的安全,从而相对高效运行;(对比半连接池只限制了等待的数量;)
进程池:提前开设了固定个数的进程 之后反复调用这些进程完成工作(后续不再开设新的)
线程池:提前开设了固定个数的线程 之后反复调用这些线程完成工作(后续不再开设新的)
模块:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport time# 线程池:固定开设5个线程,5个线程不会重复出现重复创建和销毁(节省资源);pool = ThreadPoolExecutor(5) # 括号内可以传数字,不传默认开设当前计算机cpu个数五倍的线程"""Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """def task(n): print(n) time.sleep(2) return f'返回结果:{n*2}'# pool.submit(task,1) # 朝池子中提交任务(异步)'''提交方式:同步、异步'''# print('main')# 朝池子提交20个任务,每次只能接5个t_list = []for i in range(20): res = pool.submit(task,i) # print(res.result()) # 返回结果,异步变串行,同步提交 t_list.append(res)pool.shutdown() # 关闭线程池,等待线程池中所有任务运行完毕# 解决了等待卡顿for t in t_list: # 异步提交结果,先起任务再返回结果 print('>>>>',t.result())
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport timeimport os# 进程池:固定开设几个进程,进程不会重复出现重复创建和销毁(节省资源);pool = ProcessPoolExecutor(5) # 括号内可以传数字,不传默认开设当前计算机cpu个数def task(n): print(n,os.getpid()) time.sleep(2) return f'进程号:{os.getpid()}'# 异步提交任务的返回结果,应该通过回调机制来获取,而不是下面的for循环最后获取def call_back(n): # n 返回的对象 print(n.result()) # n.result 相当于res.resultif __name__ == '__main__': # 朝池子提交20个任务,每次只能接5个 # t_list = [] for i in range(20): res = pool.submit(task,i).add_done_callback(call_back) # 回调机制 # print(res.result()) # 返回结果,异步变串行,同步提交 # t_list.append(res) # pool.shutdown() # # 解决了等待卡顿 # for t in t_list: # 异步提交结果,先起任务再返回结果 # print('>>>>',t.result())
主要操作:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorpool = ProcessPoolExecutor(5)res = pool.submit(task,i).add_done_callback(call_back) # 回调机制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorimport timeimport os# 创建进程池与线程池# pool = ThreadPoolExecutor(5) # 可以自定义线程数 也可以采用默认策略pool = ProcessPoolExecutor(5) # 可以自定义线程数 也可以采用默认策略# 定义一个任务def task(n): print(n, os.getpid()) time.sleep(2) return '>>>:%s' % n ** 2# 定义一个回调函数:异步提交完之后有结果自动调用该函数def call_back(a): print('异步回调函数:%s' % a.result())# 朝线程池中提交任务# obj_list = []for i in range(20): res = pool.submit(task, i).add_done_callback(call_back) # 异步提交 # obj_list.append(res)"""同步:提交完任务之后原地等待任务的返回结果 期间不做任何事异步:提交完任务之后不愿地等待任务的返回结果 结果由异步回调机制自动反馈"""# 等待线程池中所有的任务执行完毕之后 再获取各自任务的结果# pool.shutdown()# for i in obj_list:# print(i.result()) # 获取任务的执行结果 同步在windows电脑中如果是进程池的使用也需要在__main__下面
进程:资源单位
线程:工作单位
协程:程序员自定义的名词,意思是单线程下实现并发(程序员自己在代码层面上监测我们所有的IO操作,一但遇到IO,我们在代码级别完成切换,这样给cpu的感觉是程序一直在运行,没有IO操作从而提升效率)
多道技术:切换+保存技术
CPU被剥夺的条件:
并发去实现切换+保存状态
欺骗CPU的行为:
单线程下我们如果能够自己检测IO操作并且自己实现代码层面的切换
那么对于CPU而言我们这个程序就没有IO操作,CPU会尽可能的被占用注意切换不一定能提升效率,如果是IO密集型就会提升效率,计算密集型切换就会降低效率;
能够自主监测IO行为并切换
from gevent import monkey;monkey.patch_all()# 固定代码格式加上之后才能检测所有的IO行为from gevent import spawnimport timedef play(name): print('%s play 1' % name) time.sleep(5) print('%s play 2' % name)'''两个方法有IO操作,代码一直在反复“跳”'''def eat(name): print('%s eat 1' % name) time.sleep(3) print('%s eat 2' % name)start = time.time()# play('Hammer') # 正常的同步调用# eat('Hammer') # 正常的同步调用 8s+g1 = spawn(play, 'Hammer') # 异步提交g2 = spawn(eat, 'Hammer') # 异步提交g1.join()g2.join() # 等待被监测的任务运行完毕print('主', time.time() - start) # 单线程下实现并发,提升效率5s+
# 并发效果:一个服务端可以同时服务多个客户端import socketfrom gevent import monkey;monkey.patch_all()from gevent import spawndef talk(sock): while True: try: data = sock.recv(1024) if len(data) == 0:break print(data) sock.send(data+b'hi') except ConnectionResetError as e: print(e) sock.close() breakdef servers(): server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen() while True: sock, addr = server.accept() spawn(talk,sock)g1 = spawn(servers)g1.join()# 客户端开设几百个线程发消息即可最牛的情况:多进程下开设多线程,多线程下开设协程我们以后可能自己动手写的不多,一般都是使用别人封装好的模块或框架
IO模型研究的主要是网络IO(linux系统),理论为主,代码实现大部分为伪代码;
Stevens在文章中一共比较了五种IO Model
blocking IO 阻塞IO
nonblocking IO 非阻塞IO
IO multiplexing IO多路复用
signal driven IO 信号驱动IO
asynchronous IO 异步IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model
最为常见的一种IO模型,有两个等待的阶段(wait for data、copy data)
计算机1和计算机2数据传输,需要经过拷贝到内存到OSI,然后才到计算机2的OSI七层到内存;
系统调用阶段变为了非阻塞(轮询) 有一个等待的阶段(copy data),轮询的阶段是比较消耗资源的;
通俗的理解:会一直询问kernel有没有数据~,并不会阻塞操作,直到copy才会阻塞;
利用select或者epoll来监管多个程序 一旦某个程序需要的数据存在于内存中了 那么立刻通知该程序去取即可;
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程;通俗理解为:多个人排队取餐,监控select,如果参号了,kernel说好了,然后用户去取餐return;
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection;
只需要发起一次系统调用 之后无需频繁发送 有结果并准备好之后会通过异步回调机制反馈给调用者;