栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python多进程与多线程

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python多进程与多线程

1. 创建多进程
import os

ret = os.fork()
print("haha"

会打印两遍。

import os
from time import sleep
ret = os.fork()

if ret == 0:
	while True:
		print("----1------")
		sleep(1)
else:
	while True:
		print("=====2======")
		sleep(1)

此时1和2都会被打印。

fork函数会返回一个值。从fork开始产生多个进程,不同的进程都从fork函数开始往下执行。

进程分为父进程(主进程)和子进程。父进程的id大于0,子进程的id等于0。

父子进程执行顺序不定。fork之前的代码只有父进程会执行。

使用os.getpid()获取当前进程的pid(进程ID号)。fork返回子进程的id。getppid获取父进程ID。

import os

ret = os.fork()

# ret = os.getpid()

getppid用来获取父进程的pid。

父进程中fork的返回值就是创建的子进程的ID。

多个进程之间的全局变量不共享,每个进程都有自己的一份拷贝,数据之间互不影响:

import os
import time

g_num = 100

ret = os.fork()
if ret == 0:
	g_num += 1
	print("process1, %d" % g_num)
else:
	time.sleep(3)
	print("process2, %d" % g_num)

# 将分别打印
# process 1 101
# proess 2 100

多次fork,会产生 2 n 2^n 2n个进程。

但是windows平台没有fork函数,因此python使用multiprocessing.Process来创建子进程。

2. Process创建多进程
from multiprocessing import Process
import time

def test():
	while Ture:
		print("")
		time.sleep(1)
		
p = Process(target=test)  # 创建一个进程

p.start()  # 让该进程开始执行test中的代码

while True:
	print("---main ----")
	time.sleep(1)

使用Process类创建一个进程,返回该进程的实例对象。并且使用target参数来指定要执行的代码。当target里面的代码执行完成,则该进程执行结束。

使用fork创建的进程,主进程可以自己先结束。但是使用Process创建的子进程,主进程会等待子进程结束后再结束。

注意:主进程会继续往下执行,但是在结束之前会等待子进程结束再结束。如想让主进程等待,需要调用join方法。

3. 使用join等待子进程结束后继续向下执行
p = Process(target="")
p.start()

p.join() 
print("main---------------")

父进程会在join函数这个地方等待,等子进程结束后再往下。子进程运行结束就结束等待。即堵塞式运行。

使用join函数还可以指定等待时间:

p.join(5)  # 等待5s,5s过后不再等待直接往下执行
4. 继承Process类来创建进程
from multiprocess import Process

class MyProcess(Process):
	def __init__(self):
		pass

	def run(self):
		pass

p = MyProcess()
p.start()
p.join()

这种情况下也可以创建经常,但是却不用传target参数。只需要重写run方法来指定子进程将来要执行的代码。

p.start()调用后,会先去执行start方法,然后start方法回去调用run方法。

5.使用进程池创建多进程

先创建一堆的进程放着,然后等着去调用。想要用的时候分配一个进程,用完之后还给进程池。增加重复使用率。

from multiprocessing import Pool

pool = Pool(3)  # 创建3个进程

使用apply_async往进程池添加任务

def worker(msg):
	...

po.apply_async(worker, (0))  # 使用元组传递参数

Pool(3)指定最多可以同时执行的进程数量。apply_async相当于start方法,添加后进程池里面的经常就开始执行。

但是进程池Pool与Process不同之处在于,不会等待子进程结束再结束。主进程结束后,进程池直接消失,子任务也不会被执行。因此需要使用join方法来让主进程等待进程池里面的所有任务结束。

可以使用pool.close()来禁止往进程池里面再添加任务。

pool = Pool(3)

pool.apply_async(worker)
pool.close()
pool.join() 

往进程池中添加进程时不会产生堵塞现象。当任务数太多时,进程时会自动分配任务。且任务完成后,进程仍然存在,下一个任务直接分配给该进程,可以节省进程创建开销,因为进程不会被销毁。创建和销毁的时候都被节省出来了。

任务数过多不会导致添加不进去。添加进去的任务,如果还没有被执行,则会等待进程池中的进程完成一个任务之后,会自动去用刚刚的那个进程去完成当前的新任务。

pool.close()  # 关闭进程池,相当于不能够再次添加新任务了

pool.join()  # 主进程 创建/添加 任务后,主进程默认不会等待进程池中的任务执行完后结束,而是当主进程的任务做完之后,立刻结束。如果没有join回导致进程池中的任务不会执行。join保证主进程不结束,也就保证进程池中的进程会被执行。

使用Pool创建时,主进程一般用来等待,真正的任务都在子进程中执行。进程池并非越大越好。太大时不同进程间调度的开销越大。

6 使用apply来堵塞式添加任务

可以在满足某些添加时添加。但使用很少。

7 进程间通信

使用队列完成进程间的通信。

from multiprocessing import Queue

q = Queue(3)  # 创建1个Queue对象,最多可接收3条put消息

使用put堵塞式往队列里面添加消息。

q.put(1)

使用get来堵塞式取出队列中的消息

q.get()  # 取消息,并pop该消息

使用full和empty判断队列状态。

if not q.empty():
	q.get()

if not q.full():
	q.put()

使用put_nowait()和get_nowait()无等待添加和获取。如果full和empty则会报异常,因此这连个方法应该放在try中使用。

使用Queue通信,将queue作为参数传递给进程

def write(q):
	q.put(1)
	q.put(2)
def read(q):
	q.get()
	q.get()
q = Queue()
pw = Process(target="", args=(q,))
pr = Process(target="", args=(q,))

pw.start()
pw.join()

pr.start()
pr.join()

还可以使用多个Queue来进行通信。

8. 进程池中的进程通信

只能使用Manager完成进程池任务的通信。

from multiprocessing import Manager, Pool

q = Manager().Queue()

pool = Pool()
po.apply(writer, (q))
pool.apply(reader, (q))

pool.close()
pool.join()

注意:进程间相互之间互不影响。

9. 多线程

导入threading模块。

from threading import Thread

t = Thread(target="")
t.start()

与进程唯一的区别就是将Process类改为Thread类,其余的target参数和start方法都相同。

for i in range(10):
	t = threading.Thread(target="")
	t.start()  # 可以不等上个线程结束就执行下一个线程

主线程会等待所有子线程结束后再结束。

10 使用Thread子类创建多线程

继承Thread类,并重写run方法。与multiprocess一模一样。

from threading import Thread

class MyThread(Thread):
	def run(self):
		...

t = MyThread()
t.start()

首先调用父类的start方法,然后跳转执行run方法。主线程用来回收资源。各个子线程和主线程的执行顺序不确定。

使用self.name属性获取当前线程的名称。

进程不管全局变量还是局部变量都有自己的一份,互相不能修改。但是线程是共享全局变量的。线程间需要使用同步保证正确。

可变类型的局部参数在多个线程之间也是共享的。

11. 线程互斥锁

保证多个线程在医院竞争的时候

from threading import Lock

mutex = Lock()  # 创建锁,默认未上锁

mutex.acquire(blocking)  # 锁定

mutex.release()

如果一个线程上锁,导致另一个线程堵塞,直到锁被释放为止。一般全局变量修改时需要加锁。

12 解决死锁
metex = Lock()  
mutex.acquire(blocking=False, timeout=10)  # 等待10秒
13 消费者生产者模式

使用队列生产缓冲区域。

导包

from queue import Queue

q = Queue()
q.put()
q.get()
14. 多进程异步

使用apply_async的callback参数实现

pool = Pool(3)

pool.apply_async(func=test, callback=test2)

先去执行test函数,执行完成之后通知父进程执行 test2函数,test的返回值被当做参数传递给test2函数。

15. 其他函数
  • startmap: 支持将多个参数放入到队列中,不同参数按照顺序以元组形式存放
from multiprocessing import Pool
def func(a, b):
    print(a + b)

if __name__=="__main__":
    args = [(1,2),(3,4),(5,6)]
    pool = Pool(3)
    pool.starmap(func, args)  

# 输出: 3, 7, 11

同步执行: 一个任务执行完再进行下一个任务
异步执行: 启动后不等这个进程结束又开始执行新任务

同步和异步的区别主要在于主进程是否会等待子进程结束再继续运行。因此一般异步使用join就变成了同步。

startmap和startmat_async都可以传入多个参数, 但是startmap是同步的,startmap_async是异步的。

在获取进程池中的结果时map_async也会阻塞,需要等待所有Task执行结束后返回list。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/286372.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号