Python 不乏并发选项 标准库包括对线程、进程和异步 I/O 的支持。在许多情况下 Python 通过创建异步、线程和子进程等高级模块 消除了使用这些各种并发方法的困难。在标准库之外 还有第三种解决方案 例如twisted、stackless 和处理模块 仅举几例。本文使用实践示例专门关注 Python 中的线程处理。网上有很多很好的资源来记录线程 API 但本文试图提供常见线程使用模式的实践示例。
首先定义进程和线程之间的区别很重要。线程与进程的不同之处在于它们共享状态、内存和资源。这个简单的区别对于线程来说既是优点也是缺点。一方面 线程是轻量级的并且易于通信 但另一方面 它们带来了一系列问题 包括死锁、竞争条件和纯粹的复杂性。幸运的是 由于 GIL 和排队模块 Python 中的线程实现起来比其他语言要简单得多。
你好 Python 线程接下来 我假设你已经安装了 Python 2.5 或更高版本 因为许多示例将使用 Python 语言的更新功能 这些功能至少出现在 Python2.5 中。要开始使用 Python 中的线程 我们将从一个简单的“Hello World”示例开始
清单 1. hello_threads_example
import threading import datetime class ThreadClass(threading.Thread): def run(self): now datetime.datetime.now() print %s says Hello World at time: %s % (self.getName(), now) for i in range(2): t ThreadClass() t.start()
如果你运行这个例子 你会得到以下输出
#python hello_threads.py Thread‑1 says Hello World at time: 2008‑05‑13 13:22:50.252069 Thread‑2 says Hello World at time: 2008‑05‑13 13:22:50.252576
查看此输出 你可以看到你收到了来自两个带有日期戳的线程的 Hello World 语句。如果你查看实际代码 会发现有两个 import 语句 一个导入 datetime 模块 另一个导入 threading 模块。该类ThreadClass继承自threading.Thread 因此 您需要定义一个 run 方法来执行您在线程内运行的代码。在 run 方法中唯一需要注意的重要事项self.getName()是该方法将标识线程的名称。
最后三行代码实际上调用了类并启动了线程。如果您注意到 t.start()实际上是启动线程的。线程模块在设计时就考虑到了继承性 实际上是建立在较低级别的线程模块之上的。在大多数情况下 继承自 被认为是最佳实践threading.Thread 因为它为线程编程创建了一个非常自然的 API。
使用带线程的队列正如我之前提到的 当线程需要共享数据或资源时 线程处理可能会很复杂。线程模块确实提供了许多同步原语 包括信号量、条件变量、事件和锁。虽然存在这些选项 但最好的做法是专注于使用队列。队列更容易处理 并使线程编程更加安全 因为它们有效地将所有对资源的访问集中到单个线程 并允许更清晰、更易读的设计模式。
在下一个示例中 你将首先创建一个程序 该程序将依次或一个接一个地获取网站的 URL 并打印出页面的前 1024 个字节。这是使用线程可以更快地完成某些事情的经典示例。首先 让我们使用urllib2模块一次抓取这些页面 并对代码进行计时
清单 2. URL 获取序列
import urllib2 import time hosts http://yahoo.com , http://google.com , http://amazon.com , http://ibm.com , http://apple.com start time.time() #grabs urls of hosts and prints first 1024 bytes of page for host in hosts: url urllib2.urlopen(host) print url.read(1024) print Elapsed Time: %s % (time.time() ‑ start)
当你运行它时 你会得到大量输出到标准输出 因为页面被部分打印。但你会在最后得到这个
Elapsed Time: 2.40353488922
让我们稍微看一下这段代码。你只导入两个模块。首先 urllib2模块是承担重任并抓取网页的东西。其次 你通过调用time.time()创建一个开始时间值 然后再次调用它并减去初始值以确定程序执行所需的时间。最后 从程序的速度来看 “两秒半”的结果并不可怕 但如果你有数百个网页要检索 考虑到当前的平均值 大约需要 50 秒。看看创建线程版本如何加快速度
清单 3. URL 获取线程
#!/usr/bin/env python import Queue import threading import urllib2 import time hosts http://yahoo.com , http://google.com , http://amazon.com , http://ibm.com , http://apple.com queue Queue.Queue() class ThreadUrl(threading.Thread): Threaded Url Grab def init(self, queue): threading.Thread.init(self) self.queue queue def run(self): while True: #grabs host from queue host self.queue.get() #grabs urls of hosts and prints first 1024 bytes of page url urllib2.urlopen(host) print url.read(1024) #signals to queue job is done self.queue.task_done() start time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t ThreadUrl(queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main() print Elapsed Time: %s % (time.time() ‑ start)
这个例子有更多的代码需要解释 但由于使用了排队模块 它并没有比第一个线程示例复杂多少。这种模式是在 Python 中使用线程的一种非常常见且推荐的方式。步骤描述如下
创建一个Queue.Queue()实例 然后用数据填充它。将填充数据的实例传递到从threading.Thread继承而创建的Thread类中。产生一个守护线程池。一次从队列中拉出一项 并在线程内部使用该数据 即 run 方法 来完成这项工作。工作完成后 向queue.task_done()队列发送任务已完成的信号。加入队列 这实际上意味着等到队列为空 然后退出主程序。关于此模式的注意事项 通过将守护线程设置为 true 它允许主线程或程序在只有守护线程处于活动状态时退出。这创建了一种控制程序流程的简单方法 因为你可以在退出之前加入队列 或等到队列为空。确切的过程在队列模块的文档中得到了最好的描述 如右侧的资源部分所示
join()阻塞 直到队列中的所有项目都被获取和处理。每当将项目添加到队列时 未完成任务的计数就会增加。每当使用者线程调用 task_done() 以指示该项目已被检索并且其上的所有工作已完成时 未完成任务的计数就会下降。当未完成任务的数量降至零时 join()解锁。 使用多个队列
因为上面演示的模式非常有效 所以通过将额外的线程池与队列链接来扩展它是相对简单的。在上面的示例中 你只是打印出网页的第一部分。下一个示例返回每个线程抓取的整个网页 然后将其放入另一个队列。然后设置另一个加入第二个队列的线程池 然后在网页上工作。本示例中执行的工作涉及使用名为 Beautiful Soup 的第三方 Python 模块解析网页。仅使用几行代码 使用此模块 你将提取标题标签并为你访问的每个页面打印出来。
清单 4. 多队列数据挖掘网站
import Queue import threading import urllib2 import time from BeautifulSoup import BeautifulSoup hosts http://yahoo.com , http://google.com , http://amazon.com , http://ibm.com , http://apple.com queue Queue.Queue() outqueue Queue.Queue() class ThreadUrl(threading.Thread): Threaded Url Grab def init(self, queue, outqueue): threading.Thread.init(self) self.queue queue self.outqueue outqueue def run(self): while True: #grabs host from queue host self.queue.get() #grabs urls of hosts and then grabs chunk of webpage url urllib2.urlopen(host) chunk url.read() #place chunk into out queue self.out_queue.put(chunk) #signals to queue job is done self.queue.task_done() class DatamineThread(threading.Thread): Threaded Url Grab def __init(self, out_queue): threading.Thread.__init(self) self.out_queue out_queue def run(self): while True: #grabs host from queue chunk self.out_queue.get() #parse the chunk soup BeautifulSoup(chunk) print soup.findAll([ title ]) #signals to queue job is done self.out_queue.task_done() start time.time() def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t ThreadUrl(queue, out_queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) for i in range(5): dt DatamineThread(out_queue) dt.setDaemon(True) dt.start()



