- 消息队列
- IPC机制(进程间通信)
- 生产者消费者模型
- 线程理论
- 开设线程的两种方式
- 线程实现TCP服务端的并发
- 线程join方法
- 同一个进程内的多个线程数据共享
- 线程对象属性和方法
- 守护线程
- GIL全局解释器锁
- 创建共享的进程队列,要用到Queue(),括号内是自定义队列的长度,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
- 队列:先进先出(使用频率很高)
堆栈:先进后出(特定常见下用) - 以后我们会直接使用别人封装好的消息队列 实现各种数据传输
关于Queue方法有以下:
Queue([maxsize]) 创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 Queue的实例q具有以下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize() 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。 q.empty() 如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full() 如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。 q.close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。 q.cancel_join_thread() 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。 q.join_thread() 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
之所以介绍队列是因为它可以支持进程间数据通信
代码实例如下:
'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''
from multiprocessing import Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
# 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
print('队列已经满了')
# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
print('队列已经空了')
print(q.empty()) #空了
二、 IPC机制(进程间通信)
- 主进程与子进程数据交互
- 两个子进程数据交互
- 本质:不同内存空间中的进程数据交互
实例代码如下:
from multiprocessing import Process, Queue
def producer(q):
# print('子进程producer从队列中取值>>>:', q.get())
q.put('子进程producer往队列中添加值')
def consumer(q):
print('子进程consumer从队列中取值>>>:', q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=producer, args=(q, ))
p1 = Process(target=consumer, args=(q,))
p.start()
p1.start()
# q.put(123) # 主进程往队列中存放数据123
print('主进程')
三、 生产者消费者模型
- 生产者:负责生产/制作数据的线程
- 消费者:负责消费/处理数据的线程
- 为什么要使用生产者和消费者模式?
在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据,同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式
- 什么是生产者消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力
老师举例:
"""
比如在爬虫领域中
会先通过代码爬取网页数据(爬取网页的代码就可以称之为是生产者)
之后针对网页数据做筛选处理(处理网页的代码就可以称之为消费者)
如果使用进程来演示
除了有至少两个进程之外 还需要一个媒介(消息队列)
以后遇到该模型需要考虑的问题其实就是供需平衡的问题
生产力与消费力要均衡
"""
from multiprocessing import Process, Queue, JoinableQueue
import time
import random
def producer(name, food, q):
for i in range(5):
data = f'{name}生产了{food}{i}'
print(data)
time.sleep(random.randint(1, 3)) # 模拟产生过程
q.put(data)
def consumer(name, q):
while True:
food = q.get()
# if food == None:
# print('完蛋了 没得吃了 要饿死人了')
# break
time.sleep(random.random())
print(f'{name}吃了{food}')
q.task_done() # 每次去完数据必须给队列一个反馈
if __name__ == '__main__':
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer, args=('大厨jason', '韭菜炒蛋', q))
p2 = Process(target=producer, args=('老板kevin', '秘制小汉堡', q))
c1 = Process(target=consumer, args=('涛涛', q))
c2 = Process(target=consumer, args=('龙龙', q))
c1.daemon = True
c2.daemon = True
p1.start()
p2.start()
c1.start()
c2.start()
# 生产者生产完所有数据之后 往队列中添加结束的信号
p1.join()
p2.join()
# q.put(None) # 结束信号的个数要跟消费者个数一致才可以
# q.put(None)
"""队列中其实已经自己加了锁 所以多进程取值也不会冲突 并且取走了就没了"""
q.join() # 等待队列中数据全部被取出(一定要让生产者全部结束才能判断正确)
"""执行完上述的join方法表示消费者也已经消费完数据了"""
四、线程理论
- 什么是线程?
进程:相当于一个资源单位
线程:相当于一个执行单位
也就是进程相当于车间(一个个空间),线程相当于车间里面的流水线(真正干活的)
一个进程中至少有一个线程
进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源)
线程真正被CPU执行,线程需要的资源跟所在进程的要
- 为什么要有线程?
因为开设线程的消耗远远小于进程
开进程
1.申请内存空间
2.拷贝代码
开线程
一个进程内可以开设多个线程 无需申请内存空间、拷贝代码
一个进程内的多个线程数据是共享的
""" 开发一个文本编辑器 获取用户输入并实时展示到屏幕上 并实时保存到硬盘中 多种功能应该开设多线程而不是多进程 """五、 开设线程的两种方式
"""进程与线程的代码实操几乎是一样的"""
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
# 创建线程无需在__main__下面编写 但是为了统一 还是习惯在子代码中写
t = Thread(target=task, args=('jason', ))
t.start() # 创建线程的开销极小 几乎是一瞬间就可以创建
print('主线程')
class MyThread(Thread):
def __init__(self, username):
super().__init__()
self.username = username
def run(self):
print(f'{self.username} jason is running')
time.sleep(3)
print(f'{self.username} is over')
t = MyThread('jasonNB')
t.start()
print('主线程')
六、 线程实现TCP服务端的并发
仔细体会开设进程和线程的本质区别
import socket
from threading import Thread
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen()
def talk(sock):
while True:
data = sock.recv(1024)
print(data.decode('utf8'))
sock.send(data.upper())
while True:
sock, addr = server.accept()
# 每类一个客户端就创建一个线程做数据交互
t = Thread(target=talk, args=(sock,))
t.start()
七、 线程join方法
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t = Thread(target=task, args=('jason', ))
t.start()
t.join() # 主线程代码等待子线程代码运行完毕之后再往下执行
print('主线程')
"""
主线程为什么要等着子线程结束才会结束整个进程
因为主线程结束也就标志着整个进程的结束 要确保子线程运行过程中所需的各项资源
"""
八、 同一个进程内的多个线程数据共享
from threading import Thread
money = 10000000000
def task():
global money
money = 1
t = Thread(target=task)
t.start()
t.join()
print(money)
九、 线程对象属性和方法
1.验证一个进程下的多个线程是否真的处于一个进程
验证确实如此
2.统计进程下活跃的线程数
active_count() # 注意主线程也算!!!
3.获取线程的名字
1.current_thread().name
MainThread 主线程
Thread-1、Thread-2 子线程
2.self.name
十、守护线程
- 无论是进程还是线程,都遵循:守护线程会等待主线程运行完毕后被销毁。需要强调的是:运行完毕并非终止运行
-
对主进程来说,运行完毕指的是主进程代码运行完毕
-
对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程 统统运行完毕,主线程才算运行完毕
-
主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
-
主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t1 = Thread(target=task, args=('jason',))
t2 = Thread(target=task, args=('kevin',))
t1.daemon = True
t1.start()
t2.start()
print('主线程')
十一、 GIL全局解释器锁
"""纯理论 不影响编程 只不过面试的时候可能会被问到"""
# 官方文档
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
"""
1.回顾
python解释器的类别有很多
Cpython Jpython Ppython
垃圾回收机制
应用计数、标记清除、分代回收
GIL只存在于CPython解释器中,不是python的特征
GIL是一把互斥锁用于阻止同一个进程下的多个线程同时执行
原因是因为CPython解释器中的垃圾回收机制不是线程安全的
反向验证GIL的存在 如果不存在会产生垃圾回收机制与正常线程之间数据错乱
GIL是加在CPython解释器上面的互斥锁
同一个进程下的多个线程要想执行必须先抢GIL锁 所以同一个进程下多个线程肯定不能同时运行 即无法利用多核优势
强调:同一个进程下的多个线程不能同时执行即不能利用多核优势
很多不懂python的程序员会喷python是垃圾 速度太慢 有多核都不能用
反怼:虽然用一个进程下的多个线程不能利用多核优势 但是还可以开设多进程!!!
再次强调:python的多线程就是垃圾!!!
反怼:要结合实际情况
如果多个任务都是IO密集型的 那么多线程更有优势(消耗的资源更少)
多道技术:切换+保存状态
如果多个任务都是计算密集型 那么多线程确实没有优势 但是可以用多进程
CPU越多越好
以后用python就可以多进程下面开设多线程从而达到效率最大化
"""
1.所有的解释型语言都无法做到同一个进程下多个线程利用多核优势
2.GIL在实际编程中其实不用考虑



