您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

扭曲:等待子任务完成

扭曲:等待子任务完成

就目前而言,在我看来,这段代码的并行下载数量有限,但是并行解析作业的数量却不受限制。那是故意的吗?我将假设为“ no”,因为如果URL的数量接近无穷大,而您的网络恰好快而解析器却很慢,那么您的内存使用量也会:)。

因此,这将具有有限的并行性,但将通过下载顺序执行解析,而不是:

from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

之所以可以这样做是因为parse(显然)返回了Deferred,将其作为回调添加到所返回的getPage结果中,导致直到完成交易才Deferred调用由所添加的回调。coiterate``parse

自从您问惯用的Twisted代码以来,我还自由地进行了一些现代化(使用task.react而不是手动运行Reactor,内联表达式使内容更简短等)。

如果您确实确实想拥有比并行获取更多的并行解析,那么类似的方法可能会更好:

from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

您可以看到parseWhenReadyreturn的Deferred返回值acquire,因此只要并行解析 开始 就可以继续进行并行提取,因此即使解析器过载,您也不会继续进行任意提取。但是,请parallelParse谨慎地避免Deferred返回由parse或返回的值release,因为随着这些操作的进行,提取应该能够继续进行。

(请注意,由于您最初的示例无法运行,因此我根本没有测试过任何一个。希望即使有错误,意图也很清楚。)

其他 2022/1/1 18:39:35 有484人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶