就目前而言,在我看来,这段代码的并行下载数量有限,但是并行解析作业的数量却不受限制。那是故意的吗?我将假设为“ no”,因为如果URL的数量接近无穷大,而您的网络恰好快而解析器却很慢,那么您的内存使用量也会:)。
因此,这将具有有限的并行性,但将通过下载顺序执行解析,而不是:
from twisted.internet import defer, task
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task(reactor):
def fetch_urls():
for url in get_urls():
yield getPage(url).addCallback(parse)
coop = task.Cooperator()
urls = fetch_urls()
return (defer.DeferredList([coop.coiterate(urls)
for _ in xrange(BATCH_SIZE)])
.addCallback(task_finished))
task.react(main_task)
之所以可以这样做是因为parse
(显然)返回了Deferred
,将其作为回调添加到所返回的getPage
结果中,导致直到完成交易才Deferred
调用由所添加的回调。coiterate``parse
自从您问惯用的Twisted代码以来,我还自由地进行了一些现代化(使用task.react
而不是手动运行Reactor,内联表达式使内容更简短等)。
如果您确实确实想拥有比并行获取更多的并行解析,那么类似的方法可能会更好:
from twisted.internet import defer, task
from twisted.web.client import getPage
PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10
def main_task(reactor):
parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)
def parseWhenReady(r):
def parallelParse(_):
parse(r).addBoth(
lambda result: parseSemaphore.release().addCallback(
lambda _: result
)
)
return parseSemaphore.acquire().addCallback(parallelParse)
def fetch_urls():
for url in get_urls():
yield getPage(url).addCallback(parseWhenReady)
coop = task.Cooperator()
urls = fetch_urls()
return (defer.DeferredList([coop.coiterate(urls)
for _ in xrange(PARALLEL_FETCHES)])
.addCallback(lambda done:
defer.DeferredList(
[parseSemaphore.acquire()
for _ in xrange(PARALLEL_PARSES)]
))
.addCallback(task_finished))
task.react(main_task)
您可以看到parseWhenReady
return的Deferred
返回值acquire
,因此只要并行解析 开始 就可以继续进行并行提取,因此即使解析器过载,您也不会继续进行任意提取。但是,请parallelParse
谨慎地避免Deferred
返回由parse
或返回的值release
,因为随着这些操作的进行,提取应该能够继续进行。