-
并发
只有一个CPU,多个程序在一个CPU上轮流执行,宏观上多个进程并发执行,但微观上依旧是串行 -
并行
有多个CPU,多个程序在多个CPU上同时执行。
-
进程
计算机中最小的资源分配单位,占用资源,数据隔离,需要操作系统进行调度,每一个程序就是一个进程,其中使用pid作为进程间的唯一标识。 -
线程
线程必须存在于进程中,不能独立存在,不占用资源,数据共享,是能够被计算机独立运行和独立调度的最小单位 ,一个进程中可以有多个线程,但至少有一个 -
同步
在事件A执行过程中发起事件B,只有B结束,A才能继续执行,需要等结果 -
异步
在事件A执行过程中发起事件B,不需要B结束,A可以继续执行,不需要等结果
进程并发:
#全局执行
from multiprocessing import Process #多进程模块
import os
def fun(name,*args):
print(name,os.getpid(),os.getppid())
print('全局执行')
if __name__ == '__main__':#因此此处也终止了死循环
#只在主进程下执行的代码
print('main:',os.getpid(), os.getppid()) #整个程序的主进程,和父进程
print('只在主进程执行')
p = list()
proc1 = Process(target=fun,args=("lisi",)) #传参
print('开启子进程lisi')
proc1.start() #在当前的主进程又创建了一个子进程,当前进程既为子进程又为父进程
p.append(proc1)
proc2 = Process(target=fun, args=('wangwu',)) # 传参
print('开启子进程wangwu')
proc2.start() #异步非阻塞状态
# p.terminate()#强制结束一个子进程
# p.daemon =True #变为守护进程,直到主进程的代码结束,才结束
p.append(proc2)
for i in p: i.join() #同步阻塞;直到proc1,proc2进程都执行完,才继续执行
print('全部子进程执行完毕')
执行结果:
全局执行 main: 19028 10944 只在主进程执行 开启子进程lisi 开启子进程wangwu 全局执行 全局执行 lisi 7820 19028 wangwu 20068 19028 全部子进程执行完毕
上面使用函数的方法创建进程,还可以用类的方法创建进程
from multiprocessing import Process #多进程模块
class P(Process):#继承自Process
def __init__(self,a):#传参
super().__init__()
self.a = a
def run(self):#必须重写run方法,类似于socketserver中必须重写handel方法
print('way 2 %s'%self.a)
if __name__ == '__main__':
p = P('zhangda')
p.start()
print(p.name,p.is_alive())#进程的名字,是否存活
执行结果:
P-1 True way 2 zhangda
- 进程间的通信(ICP inter process communication)
– 基于文件通信:同一台机器上的多个进程间通信
如:socket基于文件级别的进程间数据通信(Queue队列,pipe管道)
– 基于网络通信:一台或多台机器上的多个进程间的通信
借助一些第三方工具,消息中间件,如:redis,kafka等
from multiprocessing import Process,Queue
def func(q):
q.put('hello')
def producter(q):
for i in range(10):
q.put(i)#放一个
def consumer(q):
for i in range(10):
print(q.get(),end='-')#取一个
if __name__ == '__main__':
q = Queue() #共享数据的队列
proc1 = Process(target=func,args=(q,))
proc1.daemon = True
proc1.start()#开启1
print(q.get())
proc2 = Process(target=producter, args=(q,))
proc2.start()#开启2
proc3 = Process(target=consumer, args=(q,))
proc3.start()#开启3
执行结果:
hello 0-1-2-3-4-5-6-7-8-9-
- 多线程
主线程结束,进程就会结束,守护线程随着主线程结束而结束,但如果主线程代码结束后还有子线程,会继续守护其他子线程。
守护进程和守护线程的区别:守护进程需要主进程来回收资源,其他子线程–》主线程结束–》守护进程结束–》主进程结束–》父进程回收主进程所有资源–》守护线程被回收。
线程锁-单例模式:防止不必要的数据共享,带来数据不安全
其余操作线程与进程相似
from threading import Thread,current_thread,enumerate,active_count
from multiprocessing import Process,current_process
import time
# enumerate所有活着线程的对象
# active_count所有活着线程的数量
class A:#线程的单例模式,保护数据
__instance = None
from threading import Lock
lock = Lock()
def __new__(cls,*args, **kwargs):
with cls.lock:#不加锁会导致线程并发,不同步,重复创建空间,三个不同的cls.__instance
if not cls.__instance:
time.sleep(0.01)
cls.__instance=super().__new__(cls)
print(cls.__instance,'new')
return cls.__instance
def func(a):
print('当前线程id:%dn'%current_thread().ident)
a = A()
print(a)
if __name__ == '__main__':
for i in range(3):
t= Thread(target=func,args=(10,))
t.start()
print(enumerate(),active_count())
执行结果
当前线程id:4560 当前线程id:16864 当前线程id:14628 [<_MainThread(MainThread, started 20764)>,, , ] 4 <__main__.A object at 0x000001E6605D8130> new <__main__.A object at 0x000001E6605D8130> <__main__.A object at 0x000001E6605D8130> <__main__.A object at 0x000001E6605D8130>
- 一定会发生数据不安全的情况
- 数据共享,且异步
- +=,-=,*=,/=,赋值,=计算后,数据不安全
- if,while不在同一个线程中完成
线程间的数据安全共享的容器
from queue import Empty,Queue,LifoQueue,PriorityQueue
#LifoQueue 先进后出,栈类型
# Queue 先进先出,队列类型
# PriorityQueue,优先级队列
#Empty queue的错误类型
q = Queue()
q.put(1)
q.put_nowait(1)#易丢失数据
print('queue:%d'%q.get())
try:
print(q.get_nowait())
except Empty:
print('no data')
l = LifoQueue()
l.put(10)
l.put(20)
print('lifoqueue:%d'%l.get_nowait())
pro = PriorityQueue()#进行了排序
pro.put(2222222)
pro.put(1111111)
print('prio:%d'%pro.get()
执行结果
queue:1 1 lifoqueue:20 prio:1111111
-
递归锁与互斥锁(Rlock,Lock)
-
池
要在程序开始,还没有提交任务的时候就预先创建几个进程或者进程放在池子里。提前开好,直接使用,限制额度,整体关闭。
进程池:用于高计算场景(无io(文件操作,数据库操作,网络操作,input等)) cpu_count
进程池与线程池相似
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread
import time
def func(a):
print(current_thread().ident)
time.sleep(1)#阻塞
return a
def print_ret(ret):
print(ret.result())
if __name__ == '__main__':
tp = ThreadPoolExecutor(4)#四个一组完成并发
for i in range(5):
ret =tp.submit(func,i)#任务提交到池,让池中的线程替你完成
# print(ret.result) #future未来对象,造成同步阻塞,顺序轮流执行
ret.add_done_callback(print_ret)#回调函数,造成异步阻塞,先返回先执行
# 或者写成下面
# ret =tp.map(func,range(10))#可迭代参数
# for i in ret:
# print(i)
执行结果
12472 3208 9412 7588 1 3208 0 2 3 4
- 协程
操作系统不可见,本质还是一条线程,只是多个任务在一条线程上来回切换,以规避io操作带来的时间损失,达到一条线程上io操作降到最低,充分利用CPU的目的,但线程的所有切换都基于用户,用户可以感知到的io才会进行切换,所以io操作的时间间隔比线程长。
切换并规避io的模块:
- gevent = 利用greenlet 底层模块实现切换+自动规避io
- asyncio=利用yield 底层语法实现切换+i自动规避io
from gevent import monkey
monkey.patch_all()
import time
import gevent
def func():
print('start')
time.sleep(1)
print('end')
g1 = gevent.spawn(func)
g2 = gevent.spawn(func)
g3 = gevent.spawn(func)
gevent.joinall([g1,g2,g2])
g1.join()
print('*'*10)
import asyncio
async def begin():
print('start')
await asyncio.sleep(1) #可能会报错的地方,从这里切走
print('end')
loop = asyncio.get_event_loop() #事件循环
loop.run_until_complete(begin())
执行结果
start start start end end end ********** start end



