栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python:多线程

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python:多线程

1,线程概述

几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每一个运行中的程序就是一个进程。当一个程序运行时,内部可能包含多个顺序执行流,每一个顺序执行流就是一个线程。从执行方式上来看,每个任务都是交替执行的,但是,由于CPU的执行速度太快,给用户的感觉就像所有任务都在同时执行一样。真正的并行执行多任务只能在多核CPU上实现,但由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行。

1.1,线程和进程

几乎所有的操作系统都支持进程的概念,所有运行中的任务通常对应一个进程。当一个程序进入内存中运行时,即变成一个进程。进程是处于运行过程中的程序,并且具有一定的独立功能。进程是系统进行资源分配和调度的一个独立单位。

一般而言,进程包含如下三个特征:

  • 独立性:进程是系统中独立存在的实体,它可以拥有自己的独立资源,每一个进程都拥有自己的私有的地址空间。在没有经过进程本身允许的情况下,一个用户进程不可以直接访问其他进程的地址空间。
  • 动态性:进程与程序的区别在于,程序只是一个静态的指令集合,而进程是一个正在系统中活动的指令集和。在进程中加入了时间的概念。进程具有自己的生命周期和各种不同的状态,在程序中是没有这些概念的。
  • 并发性:多个进程可以在单个处理器上并发执行,多个进程之间不会互相影响。

多线程则扩展了多进程的概念,使同一个进程可以同时并发处理多个任务。线程也被称作为轻量级进程,线程是进程的执行单元。就像进程在操作系统中的地位一样,线程在程序中是独立的、并发的执行流。当进程被初始化后,主线程就被创建了。对于绝大多数的应用程序来说,通常仅需有一个主线程,但也可以在进程内创建多个顺序执行流,这些顺序执行流就是线程,每一个线程都是独立的。

线程是进程的组成部分,一个进程可以拥有多个线程,一个线程必须有一个父线程。线程可以拥有自己的堆栈、自己的程序计数器和自己的局部变量,但不拥有系统资源,它与父进程的其他线程共享该进程所拥有的全部资源。因为多个线程共享父进程里的全部资源,因此编程更加方便;但必须更加小心,因为需要确保线程不妨碍同一进程中的其他线程。

线程可以完成一定的任务,可以与其他线程共享父进程中的共享变量及部分环境,相互之间协同来完成进程所要完成的任务。

线程是独立运行的,它并不知道进程中是否还有其他线程存在。线程的运行是抢占式的,也就是说,当前运行的线程在任何时候都可能被挂起,以便另外一个线程可以运行。

一个线程可以创建和撤销另外一个线程,同一个进程中的多个线程之间可以并发运行。

从逻辑角度看,多线程存在于一个应用程序中,让一个应用程序可以有多个执行部分同时执行,但操作系统无须将多个线程看作多个独立的应用,对多线程实现调度和管理,以及资源分配。线程的调度和管理由进程本身负责完成。

1.2,多线程的优势

线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄和其他进程应有的状态。

因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大提高了程序的运行效率。

线程比进程具有更高的性能,这是由于同一个进程中都有共性——多个线程共享同一个进程的虚拟空间。线程共享的环境包括进程代码段、进程的公有数据等,利用这些共享的数据,线程之间很容易实现通信。

操作系统在创建进程时,必须为该进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程来实现并发比使用多进程的性能要高得多。

  • 进程之间不能共享内存,但线程之间共享内存非常容易。
  • 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此,使用多线程来实现多任务并发执行比使用多进程的效率高。
  • Python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了Python的多线程编程。
2,线程的创建和启动

Python提供了_thread和threading两个模块来支持多线程,其中_thread提供低级别的、原始的线程支持,以及一个简单的锁,因此一般不建议使用_thread模块。而threading模块则提供了功能丰富的多线程支持。

  • 使用threading模块的Thread类的构造器创建线程。
  • 继承threading模块的Thread类创建线程类。
2.1,调用Thread类的构造器创建线程

调用Thread类的构造器创建线程很简单,直接调用threading.Thread类的如下构造器创建线程:

__init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
  • group:执行该线程所属的线程组。目前该参数还未实现。
  • target:指定该线程要调度的目标方法。
  • args:指定一个元组,以位置参数的形式为target指定的函数传入参数。元组的第一个元素传给target函数的第一个参数,元组的第二个元素传给target函数的第三个参数……依次类推。
  • kwargs:指定一个字典,以关键字参数的形式为target指定的函数传入参数。
  • daemon:指定所构建的线程是否为后台线程。

通过Thread类的构造器创建并启动多线程的步骤如下:

  • 调用Thread类的构造器才创建线程对象。在创建线程对象时,target参数指定的函数将作为线程执行体。
  • 调用线程对象的start()方法启动该线程。
import threading
def action(max):
    for i in range(max):
        print(threading.current_thread().getName() + " " + str(i))
for i in range(100):
    print(threading.current_thread().getName() + " " + str(i))
    if i == 20:
        t1 = threading.Thread(target=action, args=(100,))
        t1.start()
        t2 = threading.Thread(target=action, args=(100,))
        t2.start()
print("主线程执行完毕")

在多线程编程时,不要忘记Python程序运行时默认的主线程,主程序部分就是主线程的线程执行体。

上面还用到了两个方法:

  • threading.current_thread():它是threading模块的函数,该函数总是返回当前正在执行的线程对象。
  • getName():它是Thread类的实例方法,该方法返回调用它的线程名字。
2.2,继承Thread类创建线程类

通过继承Thread类来创建并启动线程的步骤:

  • 定义Thread类的子类,并重写该类的run()方法。run()方法的方法体就代表了线程需要完成的任务,因此把run()方法称为线程执行体。
  • 创建Thread子类的实例,即创建线程对象。
  • 调用线程对象的start()方法来启动线程。
import threading
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.i = 0
    def run(self):
        while self.i < 100:
            print(threading.current_thread().getName() + " " + str(self.i))
            self.i += 1
for i in range(100):
    print(threading.current_thread().getName() + " " + str(i))
    if i == 20:
        f = MyThread()
        f.start()
        f2 = MyThread()
        f2.start()
print("主线程执行完毕")
3,线程的生命周期

当线程被创建并启动以后,它既不是一启动就进入执行状态的,也不是一直处于执行状态的,在线程的生命周期中,它要经过新建(New)、就绪(Ready)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。尤其是当线程启动以后,它不能一直霸占CPU独自运行,所以CPU需要在多个线程之间切换,于是线程状态也会多次在运行、就绪之间转换。

3.1,新建和就绪状态

当程序创建了一个Thread对象或Thread子类的对象之后,该线程就处于新建状态,和其他的Python对象一样,此时的线程对象并没有表现出任何线程的动态特征,程序也不会执行线程执行体。

当线程对象调用start()方法之后,该线程处于就绪状态,Python解释器会为其创建方法调用栈和程序计数器,处于这种状态中的线程并没有开始运行,只是表示该线程可以运行了。至于该线程何时开始运行,取决于Python解释器中线程调度器的调度。

start()和run()的区别:启动线程使用start()方法,而不是run()方法!永远不要调用线程对象的run()方法!调用start()方法来启动线程,系统会把该run()方法当成线程执行体来处理;但如果直接调用线程对象的run()方法,则run()方法立即就会被执行,而且在该方法返回之前其他线程无法并发执行。也就是说,如果直接调用线程对象的run()方法,则系统把线程对象当成一个普通对象,而run()方法也是一个普通方法,而不是线程执行体。

3.2,运行和阻塞状态

如果CPU处于就绪状态和线程获得了CPU,开始执行run()方法的线程执行体,则该线程处于运行状态。如果计算机只有一个CPU,那么在任何时刻只有一个线程处于运行状态。当然,在一个具有多处理器的机器上,将会有多个线程并行执行;当线程数大于处理器数时,依然会存在多个线程在同一个CPU上轮换的情况。

当一个线程开始运行后,它不可能一直处于运行状态(除非线程执行体足够短,在时间片内能执行完毕),线程在运行过程中需要被中断,目的是使其他线程获得执行的机会,线程调节的细节取决于底层平台所采用的策略。对于采用抢占式调度策略的系统而言,系统会给每一个可执行的线程一个小时间短来执行处理任务;当该时间段用完后,系统就会剥夺该线程所占用的资源,让其他线程获得执行机会。在选择下一个线程时,系统会考虑线程的优先级。

所有现代的桌面和服务器操作系统都采用抢占式调度策略,但一些小型设备如手机等则可能采用协作式调度策略,在这样的系统中,只有当一个线程调用了它的sleep()或yield()方法后才会选择放弃其所占用的资源——也就是必须由该线程主动放弃其所占用的资源。

针对上面的几种情况,当发生如下特定的情况时可以接触阻塞,让该线程重新进入就绪状态:

  • 调用sleep()方法的线程经过了指定的时间。
  • 线程调用的阻塞式I/O方法已经返回。
  • 线程成功地获得了视图获取的锁对象。
  • 线程正在等待某个通知时,其线程发出了一个通知。
3.3,线程死亡

线程会以如下三种方式结束,结束后就处于死亡状态:

  • run()方法或代表线程执行体的target函数执行完成,线程正常结束。
  • 线程抛出一个未捕获的Exception或Error。

当主线程结束时,其他线程不受到影响,并不会随之结束。一旦子线程启动起来后,它就拥有和主线程相同的地位,它不会受主线程的影响。

为了测试某个线程是否已经死亡,可以调用线程对象的is_alive()方法,当线程处于就绪、运行、阻塞三种状态时,该方法将返回True;当线程处于新建、死亡两种状态时,该方法返回False。

不要对处于死亡状态的线程调用start()方法,程序只能对处于新建状态的线程调用start()方法,对处于新建状态的线程两次调用start()方法也是错的。它们都会引发RuntimeError异常。

4,控制线程 4.1,join线程

Thread提供了让一个线程等待另一个线程完成的方法——join()方法。当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完成。

join()方法通常由使用线程的程序调用,以将大问题划分成许多小问题,并为每个小问题分配一个线程。当所有的小问题都得到处理后,再调用主线程来进一步操作。

import threading
def action(max):
    for i in range(max):
        print(threading.current_thread().name + " " + str(i))
threading.Thread(target=action, args=(100,), name="新线程").start()
for i in range(100):
    if i == 20:
        jt = threading.Thread(target=action, args=(100,), name="被Join的线程")
        jt.start()
        jt.join()
    print(threading.current_thread().name + " " + str(i))

4.2,线程睡眠

如果需要让当前正在执行的线程暂停一段时间,并进入阻塞状态,则可以通过调用time模块的sleep(secs)函数来实现。该函数可指定一个secs函数,用于指定线程阻塞多少秒。

当当前线程调用sleep()函数进入阻塞状态后,在其睡眠时间段内,该线程不会获得执行的机会,即使系统中没有其他可执行的线程,处于sleep()中的线程也不会执行,因此sleep()函数常用来暂停程序的运行。

import time
for i in range(10):
    print("当前时间:",time.ctime())
    time.sleep(1)
4.3,后台线程

有一种线程,它是在后台运行的,它的任务是为其他线程提供服务,这种线程被称为“后台线程”,又称为“守护线程”或“精灵线程”。Python解释器的垃圾回收就是典型的守护线程。

后台线程有一个特征:如果所有的前台线程都死亡了,那么后台线程会自动死亡。

调用Thread对象的daemon属性可以将指定线程设置成后台线程。

import threading
def action(max):
    for i in range(max):
        print(threading.current_thread().name + " " + str(i))
t = threading.Thread(target=action, args=(100,), name="后台线程")
t.daemon = True
t.start()
for i in range(10):
    print(threading.current_thread().name + " " + str(i))
5,线程同步 5.1,线程安全问题

关于线程安全,有一个经典的问题——银行取钱问题,用代码模拟过程:

class Account:
    def __init__(self, account_no, balance):
        self.account_no = account_no
        self.balance = balance
import threading
import time
def draw(account, draw_amount):
    if account.balance >= draw_amount:
        print(threading.current_thread().name + " " + str(draw_amount))
        time.sleep(0.01)
        account.balance -= draw_amount
        print("余额为:", str(account.balance))
    else:
        print("余额不足")
acct = Account("1234567",1000)
threading.Thread(name="甲", target=draw, args=(acct, 800)).start()
threading.Thread(name="乙", target=draw, args=(acct, 800)).start()
==================================================================
甲 800
乙 800
余额为: 200
余额为: -600
5.2,同步锁(Lock)

为了解决线程安全问题,Python的threading模块引入锁(Lock)。threading模块提供了Lock和RLock两个类,它们都提供了如下两个方法来加锁和释放锁:

  • acquire(blocking=True, timeout=-1):请求对Lock或RLock加锁,其中timeout参数指定加锁多少秒。
  • release():释放锁。

Lock和RLock的区别如下:

  • treading.Lock:它是一个基本的锁对象,每次只能锁定一次,其余的锁请求,需等待锁释放后才能获取。
  • treading.RLock:它代表可重入锁(Reentrant Lock)。对于可重入锁,在同一个线程中可以对它进行多次锁定,也可以多次释放。如果使用RLock,那么acquire()和release()方法必须成对出现。如果调用了n次acquire()加锁,则必须调用n次release()才能释放锁。

由此可见,RLock锁具有可重入性。也就是说,同一个线程可以对已被加锁的RLock锁再次加锁,RLock对象会维持一个计数器来追踪acquire()方法的嵌套调用,线程在每次调用另一个被相同锁保护的方法。

Lock是控制多个线程对共享资源进行访问的工具。通常,锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,线程在开始访问共享资源之前应先请求获得Lock对象。当对共享资源访问完成后,程序释放对Lock对象的锁定。

在实现线程安全的控制中,比较常用的是RLock。通常使用RLock的代码格式如下:

class X:
    def m(self):
        self.lock.acquire()
        try:
            #方法体
        finally:
            self.lock.release()

使用RLock对象来控制线程安全,当加锁和释放锁出现在不同的作用范围内时,通常建议使用finally块来确保在必要时释放锁。

使用Lock对象可以非常方便地实现线程安全的类,线程安全的类具有如下特征:

  • 该类的对象可以被多个线程安全访问。
  • 每个线程在调用该对象的任意方法之后,都将得到正确的结果。
  • 每个线程在调用该对象的任意方法之后,该对象都依然保持合理的状态。

总的来说,不可变类总是线程安全的,因为它的对象状态是不可改变;但可变对象需要额外的方法来保证其线程安全。将Account类改成如下形式,它就是线程安全的。

import threading
import time
class Account:
    def __init__(self, account_no, balance):
        self.account_no = account_no
        self.balance = balance
        self.lock = threading.RLock()
        #self.lock = threading.Lock()
    def draw(self, draw_amount):
        self.lock.acquire()
        if self.balance >= draw_amount:
            print(threading.current_thread().name + " " + str(draw_amount))
            time.sleep(0.01)
            self.balance -= draw_amount
            print("余额为:", str(self.balance))
        else:
            print("余额不足")
        self.lock.release()
def draw(account, draw_amount):
    account.draw(draw_amount)

acct = Account("1234567", 1000)
threading.Thread(name="甲", target=draw, args=(acct, 800)).start()
threading.Thread(name="乙", target=draw, args=(acct, 800)).start()
====================================
甲 800
余额为: 200
余额不足

可变类的线程安全是以降低程序的运行效率作为代价,为了减少线程安全所带来的负面影响,程序可以采用如下策略:

  • 不要对线程安全类的所有方法都进行同步,只对那些会改变竞争资源的方法进行同步。
  • 如果可变类有两种运行环境:单线程环境和多线程环境,则应该为该可变类提供两个版本,即线程不安全版本和线程安全版本。在单线程环境中使用线程不安全版本以保证性能,在多线环境中使用线程安全版本。
5.3,死锁

当两个线程相互等待对方释放同步监听器时会发生死锁。Python解释器没有检测,也没有采取措施来处理死锁情况,所有在进行多线程编程时应采取措施避免出现死锁。一旦出现死锁,整个程序既不会发生任何异常,也不会给出任何提示,只是所有线程都处于阻塞状态,无法继续。

死锁很容易发生的,尤其是在系统中出现多个同步监视器的情况下。

import threading
import time
class A:
    def __init__(self):
        self.lock = threading.RLock()
    def foo(self, b):
        try:
            self.lock.acquire()
            print(threading.current_thread().name+":进入A实例的foo()方法")
            time.sleep(0.2)
            print(threading.current_thread().name+":企图调用B实例的last()方法")
            b.last()
        finally:
            self.lock.release()
    def last(self):
        try:
            self.lock.acquire()
            print("进入了A类的last()方法内部")
        finally:
            self.lock.release()
class B:
    def __init__(self):
        self.lock = threading.RLock()
    def bar(self,a):
        try:
            self.lock.acquire()
            print(threading.current_thread().name+":进入了B实例的bar()方法")
            time.sleep(0.2)
            print(threading.current_thread().name+":进入了A实例的last()方法")
            a.last()
        finally:
            self.lock.release()
    def last(self):
        try:
            self.lock.acquire()
            print("进入了B类的last()方法内部")
        finally:
            self.lock.release()
a = A()
b = B()
def init():
    threading.current_thread().name = "主线程"
    a.foo(b)
    print("进入主线程后")
def action():
    threading.current_thread().name = "副线程"
    b.bar(a)
    print("进入副线程后")
threading.Thread(target=action).start()
init()

死锁是不应该在程序中出现的,在编写程序时应该尽量避免出现死锁。常见的解决死锁的办法:

  • 避免多次锁定:尽量避免同一个线程对多个Lock进锁定。
  • 具有相同的加锁顺序:如果多个线程需要对多个Lock进行锁定,则应该保证它们的顺行请求加锁。
  • 使用定时锁:程序在调用acquire()方法加锁时可指定timeout参数,该参数指定超过timeout秒后会自动释放对Lock的锁定,这样就就可以解开死锁。
  • 死锁检测:死锁检测是一种依靠算法机制来实现的死锁语法机制,它主要针对那些不可能实现按序加锁,也不能使用定时锁的场景的。
6,线程通信

当线程在系统中运行时,线程的调度具有一定透明性,通常程序无法准确控制线程的轮换执行,如果有需要,Python可通过线程通信来保证线程协调运行。

6.1,使用Condition实现通信

假设系统中有两个线程,这两个线程分别代表存款者和取钱者,现在假设系统有一种特殊的要求,即要求存款者和取钱者不断地重复存款、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱。不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。

为了实现这种功能,可以借助于Condition对象来保持协调。使用Condition可以让那些己经得到Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待状态的线程。

将Condition对象与Lock对象组合使用,可以为每个对象提供多个等待集(wait-set)。因此,Condition 对象总是需要有对应的Lock对象。从Condition的构造器 __init__(self, lock=None) 可以看出,程序在创建Condition时可通过 lock 参数传入要绑定的Lock对象;如果不指定lock参数,在创建Condition时它会自动创建一个与之绑定的Lock对象。

Condition 类提供了如下几个方法:

  • acquire([timeout])/release():调用 Condition 关联的 Lock 的 acquire() 或 release() 方法。
  • wait([timeout]):导致当前线程进入 Condition 的等待池等待通知并释放锁,直到其他线程调用该 Condition 的 notify() 或 notify_all() 方法来唤醒该线程。在调用该 wait() 方法时可传入一个 timeout 参数,指定该线程最多等待多少秒。
  • notify():唤醒在该 Condition 等待池中的单个线程并通知它,收到通知的线程将自动调用 acquire() 方法尝试加锁。如果所有线程都在该 Condition 等待池中等待,则会选择唤醒其中一个线程,选择是任意性的。
  • notify_all():唤醒在该 Condition 等待池中等待的所有线程并通知它们。

本例程序中,可以通过一个旗标来标识账户中是否已有存款,当旗标为 False 时,表明账户中没有存款,存款者线程可以向下执行,当存款者把钱存入账户中后,将旗标设为 True,并调用 Condition 的 notify() 或 notify_all() 方法来唤醒其他线程。

当存款者线程进入线程体后,如果旗标为 True,就调用 Condition 的 wait() 方法让该线程等待。当旗标为 True 时,表明账户中已经存入了钱,取钱者线程可以向下执行,当取钱者把钱从账户中取出后,将旗标设为 False,并调用 Condition 的 notify() 或 notify_all() 方法来唤醒其他线程;当取钱者线程进入线程体后,如果旗标为 False,就调用 wait() 方法让该线程等待。

本程序为 Account 类提供了 draw() 和 deposit() 两个方法,分别对应于该账户的取钱和存款操作。因为这两个方法可能需要并发修改 Account 类的 self.balance 成员变量的值,所以它们都使用 Lock 来控制线程安全。除此之外,这两个方法还使用了 Condition 的 wait() 和 notify_all() 来控制线程通信。

import threading
class Account:
    # 定义构造器
    def __init__(self, account_no, balance):
        # 封装账户编号、账户余额的两个成员变量
        self.account_no = account_no
        self._balance = balance
        self.cond = threading.Condition()
        # 定义代表是否已经存钱的旗标
        self._flag = False
    # 因为账户余额不允许随便修改,所以只为self._balance提供getter方法
    def getBalance(self):
        return self._balance
    # 提供一个线程安全的draw()方法来完成取钱操作
    def draw(self, draw_amount):
        # 加锁,相当于调用Condition绑定的Lock的acquire()
        self.cond.acquire()
        try:
            # 如果self._flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
            if not self._flag:
                self.cond.wait()
            else:
                # 执行取钱操作
                print(threading.current_thread().name
                    + " 取钱:" +  str(draw_amount))
                self._balance -= draw_amount
                print("账户余额为:" + str(self._balance))
                # 将标识账户是否已有存款的旗标设为False
                self._flag = False
                # 唤醒其他线程
                self.cond.notify_all()
        # 使用finally块来释放锁
        finally:
            self.cond.release()
    def deposit(self, deposit_amount):
        # 加锁,相当于调用Condition绑定的Lock的acquire()
        self.cond.acquire()
        try:
            # 如果self._flag为真,表明账户中已有人存钱进去,存钱方法阻塞
            if self._flag:            # ①
                self.cond.wait()
            else:
                # 执行存款操作
                print(threading.current_thread().name
                    + " 存款:" +  str(deposit_amount))
                self._balance += deposit_amount
                print("账户余额为:" + str(self._balance))
                # 将表示账户是否已有存款的旗标设为True
                self._flag = True
                # 唤醒其他线程
                self.cond.notify_all()
        # 使用finally块来释放锁
        finally:
            self.cond.release()

#  定义一个函数,模拟重复max次执行取钱操作
def draw_many(account, draw_amount, max):
    for i in range(max):
        account.draw(draw_amount)
#  定义一个函数,模拟重复max次执行存款操作
def deposit_many(account, deposit_amount, max):
    for i in range(max):
        account.deposit(deposit_amount)
# 创建一个账户
acct = Account("1234567" , 0)
# 创建、并启动一个“取钱”线程
threading.Thread(name="取钱者11", target=draw_many,
    args=(acct, 800, 50)).start()
threading.Thread(name="取钱者12", target=draw_many,
    args=(acct, 800, 50)).start()
# 创建、并启动一个“存款”线程
threading.Thread(name="存款者1", target=deposit_many,
    args=(acct , 800, 50)).start();
threading.Thread(name="存款者2", target=deposit_many,
    args=(acct , 800, 50)).start()
threading.Thread(name="存款者3", target=deposit_many,
    args=(acct , 800, 50)).start()

上面程序使用 Condition 的 wait() 和 notify_all() 方法进行控制,对存款者线程而言,当程序进入 deposit() 方法后,如果 self._flag 为 True,则表明账户中已有存款,程序调用 Condition 的 wait() 方法被阻塞;否则,程序向下执行存款操作,当存款操作执行完成后,系统将 self._flag 设为 True,然后调用 notify_all() 来唤醒其他被阻塞的线程。如果系统中有存款者线程,存款者线程也会被唤醒,但该存款者线程执行到 ① 号代码处时再次进入阻塞状态,只有执行 draw() 方法的取钱者线程才可以向下执行。同理,取钱者线程的运行流程也是如此。

程序中的存款者线程循环 100 次重复存款,而取钱者线程则循环 100 次重复取钱,存款者线程和取钱者线程分别调用 Account 对象的 deposit()、draw() 方法来实现。主程序可以启动任意多个“存款”线程和“取钱”线程,可以看到所有的“取钱”线程必须等“存款”线程存钱后才可以向下执行,而“存款”线程也必须等“取钱”线程取钱后才可以向下执行。

6.2,多线程通信Queue

queue 模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。

  • queue.Queue(maxsize=0):代表 FIFO(先进先出)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。
  • queue.LifoQueue(maxsize=0):代表 LIFO(后进先出)的队列,与 Queue 的区别就是出队列的顺序不同。
  • PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。

这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:

  • Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
  • Queue.empty():判断队列是否为空。
  • Queue.full():判断队列是否已满。
  • Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
  • Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
  • Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
  • Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
import queue
# 定义一个长度为2的阻塞队列
bq = queue.Queue(2)
bq.put("Python")
bq.put("Python")
print("1111111111")
bq.put("Python")  # ① 阻塞线程
print("2222222222")

上面程序先定义了一个大小为 2 的 Queue,程序先向该队列中放入两个元素,此时队列还没有满,两个元素都可以被放入。当程序试图放入第三个元素时,如果使用 put() 方法尝试放入元素将会阻塞线程,如上面程序中 ① 号代码所示。与此类似的是,在 Queue 已空的情况下,程序使用 get() 方法尝试取出元素将会阻塞线程。

import threading
import time
import queue
def product(bq):
    str_tuple = ("Python", "Kotlin", "Swift")
    for i in range(100):
        print(threading.current_thread().name + "生产者准备生产元组元素!")
        time.sleep(0.2);
        # 尝试放入元素,如果队列已满,则线程被阻塞
        bq.put(str_tuple[i % 3])
        print(threading.current_thread().name 
            + "生产者生产元组元素完成!")
def consume(bq):
    while True:
        print(threading.current_thread().name + "消费者准备消费元组元素!")
        time.sleep(0.2)
        # 尝试取出元素,如果队列已空,则线程被阻塞
        t = bq.get()
        print(threading.current_thread().name 
            + "消费者消费[ %s ]元素完成!" % t)
# 创建一个容量为1的Queue
bq = queue.Queue(maxsize=1)
# 启动3个生产者线程
threading.Thread(target=product, args=(bq, ),name="甲").start()
threading.Thread(target=product, args=(bq, ),name="乙").start()
threading.Thread(target=product, args=(bq, ),name="丙").start()
# 启动一个消费者线程
threading.Thread(target=consume, args=(bq, ),name="111").start()
threading.Thread(target=consume, args=(bq, ),name="112").start()

上面程序启动了三个生产者线程向 Queue 队列中放入元素,启动了两个消费者线程从 Queue 队列中取出元素。本程序中 Queue 队列的大小为 1,因此三个生产者线程无法连续放入元素,必须等待消费者线程取出一个元素后,其中的一个生产者线程才能放入一个元素。所以在生产者生产一个元素之后,消费者必须取出一个元素,生产者生产的元素才能进入队列,否则就会发生阻塞。

6.3,使用Event控制线程通信

Event是一种非常简单的线程通信机制:一个线程发出一个Event,另外一个线程可通过该Event被触发。

Event本身管理一个内部旗标,程序可以通过Event的set()方法将该旗标设置为True,也可以调用clear()方法将该旗标设置为False。程序可以调用wait()方法来阻塞当前线程,直到Event的内部旗标被设置为True。

Event提供了如下方法:

  • is_set():该方法返回Event的内部旗标是否为True。
  • set():该方法将会把Event的内部旗标设置为True,并唤醒所有处于等待状态的线程。
  • clear():该方法将Event的内部旗标设置为False,通常接下来会调用wait()方法来阻塞当前线程。
  • wait(timeout=None):该方法会阻塞当前线程。
import threading
import time

even = threading.Event()

def cal(name):
    print("%s 启动" % threading.current_thread().getName())
    print("%s 准备开始计算" % name)
    even.wait()
    print("%s 收到通知了" % threading.current_thread().getName())
    print("%s 正式开始计算" % name)

threading.Thread(target=cal, args=("甲",)).start()
threading.Thread(target=cal, args=("乙",)).start()
time.sleep(2)
print("主线程出发")
even.set()
===============================================
Thread-1 启动
甲 准备开始计算
Thread-2 启动
乙 准备开始计算
主线程出发
Thread-1 收到通知了
甲 正式开始计算
Thread-2 收到通知了
乙 正式开始计算
7,线程池

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致Python解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。

7.1,使用线程池

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

使用线程池来执行线程任务的步骤如下:

  • 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  • 定义一个普通函数作为线程任务。
  • 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  • 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()

当程序把 action() 函数提交给线程池时,submit() 方法会返回该任务所对应的 Future 对象,程序立即判断 futurel 的 done() 方法,该方法将会返回 False(表明此时该任务还未完成)。接下来主程序暂停 3 秒,然后判断 future2 的 done() 方法,如果此时该任务已经完成,那么该方法将会返回 True。

程序最后通过 Future 的 result() 方法来获取两个异步任务返回的结果。当程序使用 Future 的 result() 方法来获取结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。

7.2,获取执行结果

前面程序调用了 Future 的 result() 方法来获取线程任务的运回值,但该方法会阻塞当前主线程,只有等到钱程任务完成后,result() 方法的阻塞才会被解除。

如果程序不希望直接调用 result() 方法阻塞线程,则可通过Future的add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池提交一个task, 50会作为action()函数的参数
    future1 = pool.submit(action, 50)
    # 向线程池再提交一个task, 100会作为action()函数的参数
    future2 = pool.submit(action, 100)
    def get_result(future):
        print(future.result())
    # 为future1添加线程完成的回调函数
    future1.add_done_callback(get_result)
    # 为future2添加线程完成的回调函数
    future2.add_done_callback(get_result)
    print('--------------')

上面主程序分别为 future1、future2 添加了同一个回调函数,该回调函数会在线程任务结束时获取其返回值。

主程序的最后一行代码打印了一条横线。由于程序并未直接调用 future1、future2 的 result() 方法,因此主线程不会被阻塞,可以立即看到输出主线程打印出的横线。接下来将会看到两个新线程并发执行,当线程任务执行完成后,get_result() 函数被触发,输出线程任务的返回值。

另外,由于线程池实现了上下文管理协议(Context Manage Protocol),因此,程序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池,如上面的程序所示。

此外,Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。

from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(max):
    my_sum = 0
    for i in range(max):
        print(threading.current_thread().name + '  ' + str(i))
        my_sum += i
    return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
    # 使用线程执行map计算
    # 后面元组有3个元素,因此程序启动3条线程来执行action函数
    results = pool.map(action, (50, 100, 150))
    print('--------------')
    for r in results:
        print(r)

上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。

运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。

通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致。也就是说,上面 results 的第一个元素是 action(50) 的结果,第二个元素是 action(100) 的结果,第三个元素是 action(150) 的结果。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/290692.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号