2021SC@SDUSC
1 Twisted
Scrapy 基于 Twisted,所以除了要有一定的 Python 基础外,还要对 Twisted 有一些了解。
我们可以将 Twisted 和 Asyncio 类比,它们都是为了支持协程而诞生的,只是前者比后者出现的更早。这 2 个技术实现的核心都是事件循环,当程序执行到某个耗时的 IO 操作时,程序的执行权限会被退回给事件循环,事件循环会检测其它准备就绪的协程,然后将执行权限交给它,当之前的协程 IO 操作完毕后,事件循环会将执行权限转给它,继续后面的操作。这样,就能在单线程内实现并发,作用和多线程类似,只是比多线程更轻量。事件循环在 Asyncio 中被叫做 event_loop,在 Twisted 中叫做 reactor。我们看一些简单的例子
1.1 Schedule
from twisted.internet import reactor
def f(s):
print('this will run 3.5 seconds after it was scheduled: %s' % s)
reactor.callLater(3.5, f, 'hello, world')
# f() will only be called if the event loop is started.
reactor.run()
reactor.callLater 会将某个任务加入到事件循环,并设置好多少秒后开始执行,当然要将事件循环启动后才会有作用。
from twisted.internet import reactor, task
def f(s):
print(s)
loop = task.LoopingCall(f, 'hello, world')
# Start looping every 1 second.
loop.start(1)
reactor.run()
每隔 1s 执行一次 f 方法。
1.2 Deferred
Deferred 表示某个任务未来会产生结果,当任务执行完毕后,会执行注册在 Deferred 的回调函数,并将结果传递给它。
1.2.1 成功回调
from twisted.internet import reactor, defer
def get_dummpy_data(input_data):
print('get_dummpy_data called')
deferred = defer.Deferred()
reactor.callLater(2, deferred.callback, input_data * 3)
return deferred
def cb_print_data(result):
print('Result received: {}'.format(result))
deferred = get_dummpy_data(3)
deferred.addCallback(cb_print_data)
# manually set up the end of the process by asking the reactor to
# stop itself in 4 seconds time
reactor.callLater(4, reactor.stop)
reactor.run()
get_dummpy_data,返回了 1 个 defer. Deferred 对象,表示该函数是 1 个异步任务,会在未来某个时间点产生结果,这里使用
reactor.callLater(2, deferred.callback, input_data * 3)
模仿异步结果,2s 后执行 deferred 的回调函数,并将计算后的结果传递给它。
cb_print_data,简单的打印结果。
看下执行流程,首先调用 get_dummpy_data 得到 deferred,然后注册回调函数 cb_print_data,最后运行事件循环。为了让程序正常关闭,还设置了在 4s 后自动关闭事件循环。执行结果
get_dummpy_data called
Result received: 9
1.2.2 异常回调
上面我们使用 addCallback 注册成功回调,除开这个,我们还能注册异常回调,实现异常处理。
from twisted.internet import reactor, defer
def get_dummpy_data(input_data):
print('get_dummpy_data called')
deferred = defer.Deferred()
if input_data % 2 == 0:
reactor.callLater(2, deferred.callback, input_data * 3)
else:
reactor.callLater(2, deferred.errback, ValueError('You used an odd number!'))
return deferred
def cb_print_data(result):
print('Result received: {}'.format(result))
def eb_print_error(failure):
print(failure)
deferred = get_dummpy_data(3)
deferred.addCallback(cb_print_data)
deferred.addErrback(eb_print_error)
reactor.callLater(4, reactor.stop)
reactor.run()
get_dummpy_data,当 input_data 为奇数时,2s 后会触发 deferred 的异常回调。
执行时,使用 addErrback 注册异常回调。
结果:
get_dummpy_data called
[Failure instance: Traceback (failure with no frames):
]
1.2.3 回调链
看到这里,有的读者可能会想如果我注册多个成功回调以及多个异常回调,成功或者异常时,具体的执行流程又是怎么样的呢?这就要引入回调链的概念了。
有 2 种类型的主链:成功以及异常回调链
- 对于前者来说,每个回调的结果都是下个回调的输入,比如 cb1 的结果会是 cb2 的输入
- 对于后者来说,某个回调返回异常或者抛出异常,就会将异常作为下一个回调的输入,比如 eb1 如果返回或者抛出异常,那么 eb2 会被调用,并将 eb1 返回或者抛出的异常作为参数
当然,还有其它情况
- 如果 cb1 异常,那么 eb2 会被调用,并将 cb1 中的异常作为参数
- 如果 eb1 处理了异常并返回值,那么 cb2 会被调用,并将 eb1 的返回值作为参数
也就是说,当前级别的回调只根据上一级别的回调的执行情况进行触发。
添加回调大概有下面几种方式
- addCallback、addErrback
- addCallbacks
- addBoth
这些方式有些细节上的不同,举例说明下
第 1 个例子
d = getDeferredFromSomewhere()
d.addCallback(cb1) # A
d.addErrback(eb1) # B
d.addCallback(cb2)
d.addErrback(eb2)
也即,对于每 1 次 addCallback(addErrback),都会在同一级上绑定 1 个成功回调(异常回调)和 1 个什么都不做的异常回调(成功回调)。
第 2 个例子
d = getDeferredFromSomewhere()
d.addCallbacks(cb1, eb1) # C
d.addCallbacks(cb2, eb2)
也即,addCallbacks 会在同一级别上绑定 1 个成功回调和 1 个异常回调。
为了加强我们的理解,可以想下这个问题,如果 cb1 出现异常,对于这 2 个例子的执行情况有什么差别?结合图其实很好理解,对于第 1 个例子,eb1 会被调用;对于第 2 个例子,eb2 会被调用。
至于 addBoth(cb),可以理解为
d = getDeferredFromSomewhere()
d.addCallback(cb)
d.addErrback(cb)



