From ce7a78797026b8c9ae406542e642a21ec558df6b Mon Sep 17 00:00:00 2001 From: Pablo Hoffman Date: Wed, 27 Jul 2011 13:38:09 -0300 Subject: [PATCH] Big downloader refactoring to support real concurrency limits per domain/ip, instead of global limits per spider which were a bit useless. This removes the setting CONCURRENT_REQUESTS_PER_SPIDER and adds thre new settings: * CONCURRENT_REQUESTS * CONCURRENT_REQUESTS_PER_DOMAIN * CONCURRENT_REQUESTS_PER_IP (overrides per domain) The AutoThrottle extension had to be disabled, but will be ported and re-enabled soon. --- docs/topics/settings.rst | 33 ++++++++-- scrapy/contrib/throttle.py | 3 + scrapy/core/downloader/__init__.py | 99 ++++++++++++++++------------- scrapy/core/engine.py | 22 ++----- scrapy/settings/default_settings.py | 7 +- scrapy/utils/engine.py | 5 +- 6 files changed, 97 insertions(+), 72 deletions(-) diff --git a/docs/topics/settings.rst b/docs/topics/settings.rst index ae6beab03..51bf9286a 100644 --- a/docs/topics/settings.rst +++ b/docs/topics/settings.rst @@ -242,15 +242,39 @@ Default: ``100`` Maximum number of concurrent items (per response) to process in parallel in the Item Processor (also known as the :ref:`Item Pipeline `). -.. setting:: CONCURRENT_REQUESTS_PER_SPIDER +.. setting:: CONCURRENT_REQUESTS -CONCURRENT_REQUESTS_PER_SPIDER +CONCURRENT_REQUESTS +------------------- + +Default: ``16`` + +The maximum number of concurrent (ie. simultaneous) requests that will be +performed by the Scrapy downloader. + + +.. setting:: CONCURRENT_REQUESTS_PER_DOMAIN + +CONCURRENT_REQUESTS_PER_DOMAIN ------------------------------ Default: ``8`` -Specifies how many concurrent (ie. simultaneous) requests will be performed per -open spider. +The maximum number of concurrent (ie. simultaneous) requests that will be +performed to any single domain. + +.. setting:: CONCURRENT_REQUESTS_PER_IP + +CONCURRENT_REQUESTS_PER_IP +-------------------------- + +Default: ``0`` + +The maximum number of concurrent (ie. simultaneous) requests that will be +performed to any single IP. If non-zero, the +:setting:`CONCURRENT_REQUESTS_PER_DOMAIN` setting is ignored, and this one is +used instead. In other words, concurrency limits will be applied per IP, not +per domain. .. setting:: CONCURRENT_SPIDERS @@ -548,7 +572,6 @@ Default:: 'scrapy.contrib.closespider.CloseSpider': 0, 'scrapy.contrib.feedexport.FeedExporter': 0, 'scrapy.contrib.spidercontext.SpiderContext': 0, - 'scrapy.contrib.throttle.AutoThrottle': 0, 'scrapy.contrib.logstats.LogStats': 0, } diff --git a/scrapy/contrib/throttle.py b/scrapy/contrib/throttle.py index df1d6d7db..228af2021 100644 --- a/scrapy/contrib/throttle.py +++ b/scrapy/contrib/throttle.py @@ -1,3 +1,6 @@ +# TODO: this extension is currently broken and needs to be ported after the +# downloader refactoring introduced in r2732 + from scrapy.xlib.pydispatch import dispatcher from scrapy.utils.python import setattr_default from scrapy.conf import settings diff --git a/scrapy/core/downloader/__init__.py b/scrapy/core/downloader/__init__.py index 503a8496a..100471e3b 100644 --- a/scrapy/core/downloader/__init__.py +++ b/scrapy/core/downloader/__init__.py @@ -1,77 +1,97 @@ +import socket import random +import warnings from time import time from collections import deque +from functools import partial from twisted.internet import reactor, defer from twisted.python.failure import Failure -from scrapy.conf import settings from scrapy.utils.python import setattr_default from scrapy.utils.defer import mustbe_deferred from scrapy.utils.signal import send_catch_log +from scrapy.utils.reactor import CallLaterOnce +from scrapy.utils.httpobj import urlparse_cached +from scrapy.resolver import gethostbyname from scrapy import signals from scrapy import log from .middleware import DownloaderMiddlewareManager from .handlers import DownloadHandlers class Slot(object): - """Downloader spider slot""" - - def __init__(self, spider): - setattr_default(spider, 'download_delay', spider.settings.getfloat('DOWNLOAD_DELAY')) - setattr_default(spider, 'randomize_download_delay', spider.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY')) - setattr_default(spider, 'max_concurrent_requests', spider.settings.getint('CONCURRENT_REQUESTS_PER_SPIDER')) - if spider.download_delay > 0 and spider.max_concurrent_requests > 1: - spider.max_concurrent_requests = 1 - msg = "Setting max_concurrent_requests=1 because of download_delay=%s" % spider.download_delay - log.msg(msg, spider=spider) - self.spider = spider + """Downloader slot""" + + def __init__(self, concurrency, settings): + self.concurrency = concurrency + self.delay = settings.getfloat('DOWNLOAD_DELAY') + self.randomize_delay = settings.getbool('RANDOMIZE_DOWNLOAD_DELAY') self.active = set() self.queue = deque() self.transferring = set() self.lastseen = 0 - self.next_request_calls = set() def free_transfer_slots(self): - return self.spider.max_concurrent_requests - len(self.transferring) - - def needs_backout(self): - # use self.active to include requests in the downloader middleware - return len(self.active) > 2 * self.spider.max_concurrent_requests + return self.concurrency - len(self.transferring) def download_delay(self): - delay = self.spider.download_delay - if self.spider.randomize_download_delay: - delay = random.uniform(0.5*delay, 1.5*delay) - return delay - - def cancel_request_calls(self): - for call in self.next_request_calls: - call.cancel() - self.next_request_calls.clear() + if self.randomize_delay: + return random.uniform(0.5*self.delay, 1.5*self.delay) + return self.delay class Downloader(object): - def __init__(self): + def __init__(self, settings): + self.settings = settings self.slots = {} + self.active = set() self.handlers = DownloadHandlers() + self.total_concurrency = settings.getint('CONCURRENT_REQUESTS') + self.domain_concurrency = settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN') + self.ip_concurrency = settings.getint('CONCURRENT_REQUESTS_PER_IP') self.middleware = DownloaderMiddlewareManager.from_settings(settings) + # TODO: remove for Scrapy 0.15 + c = settings.getint('CONCURRENT_REQUESTS_PER_SPIDER') + if c: + warnings.warn("CONCURRENT_REQUESTS_PER_SPIDER setting is deprecated, use CONCURRENT_REQUESTS_PER_DOMAIN instead", DeprecationWarning) + self.domain_concurrency = c + def fetch(self, request, spider): - slot = self.slots[spider] + key, slot = self._get_slot(request) + self.active.add(request) slot.active.add(request) def _deactivate(response): + self.active.remove(request) slot.active.remove(request) + if not slot.active: # remove empty slots + del self.slots[key] return response - dfd = self.middleware.download(self._enqueue_request, request, spider) + dlfunc = partial(self._enqueue_request, slot=slot) + dfd = self.middleware.download(dlfunc, request, spider) return dfd.addBoth(_deactivate) - def _enqueue_request(self, request, spider): - slot = self.slots[spider] - + def needs_backout(self): + return len(self.active) >= self.total_concurrency + + def _get_slot(self, request): + key = urlparse_cached(request).hostname + if self.ip_concurrency: + concurrency = self.ip_concurrency + try: + key = gethostbyname(key) + except socket.error: # resolution error + pass + else: + concurrency = self.domain_concurrency + if key not in self.slots: + self.slots[key] = Slot(concurrency, self.settings) + return key, self.slots[key] + + def _enqueue_request(self, request, spider, slot): def _downloaded(response): send_catch_log(signal=signals.response_downloaded, \ response=response, request=request, spider=spider) @@ -91,9 +111,7 @@ class Downloader(object): if penalty > 0 and slot.free_transfer_slots(): d = defer.Deferred() d.addCallback(self._process_queue) - call = reactor.callLater(penalty, d.callback, spider, slot) - slot.next_request_calls.add(call) - d.addBoth(lambda x: slot.next_request_calls.remove(call)) + reactor.callLater(penalty, d.callback, spider, slot) return slot.lastseen = now @@ -120,15 +138,6 @@ class Downloader(object): return _ return dfd.addBoth(finish_transferring) - def open_spider(self, spider): - assert spider not in self.slots, "Spider already opened: %s" % spider - self.slots[spider] = Slot(spider) - - def close_spider(self, spider): - assert spider in self.slots, "Spider not opened: %s" % spider - slot = self.slots.pop(spider) - slot.cancel_request_calls() - def is_idle(self): return not self.slots diff --git a/scrapy/core/engine.py b/scrapy/core/engine.py index 7759ccd89..5d674700b 100644 --- a/scrapy/core/engine.py +++ b/scrapy/core/engine.py @@ -57,7 +57,7 @@ class ExecutionEngine(object): self.running = False self.paused = False self.scheduler = load_object(settings['SCHEDULER'])() - self.downloader = Downloader() + self.downloader = Downloader(self.settings) self.scraper = Scraper(self, self.settings) self._concurrent_spiders = settings.getint('CONCURRENT_SPIDERS') self._spider_closed_callback = spider_closed_callback @@ -117,8 +117,7 @@ class ExecutionEngine(object): slot = self.slots[spider] return not self.running \ or slot.closing \ - or self.spider_is_closed(spider) \ - or self.downloader.slots[spider].needs_backout() \ + or self.downloader.needs_backout() \ or self.scraper.slots[spider].needs_backout() def _next_request_from_scheduler(self, spider): @@ -150,14 +149,9 @@ class ExecutionEngine(object): scraper_idle = spider in self.scraper.slots \ and self.scraper.slots[spider].is_idle() pending = self.scheduler.spider_has_pending_requests(spider) - downloading = spider in self.downloader.slots \ - and self.downloader.slots[spider].active - return scraper_idle and not (pending or downloading) - - def spider_is_closed(self, spider): - """Return True if the spider is fully closed (ie. not even in the - closing stage)""" - return spider not in self.downloader.slots + downloading = bool(self.downloader.slots) + idle = scraper_idle and not (pending or downloading) + return idle @property def open_spiders(self): @@ -205,7 +199,7 @@ class ExecutionEngine(object): slot.nextcall.schedule() return _ - dwld = mustbe_deferred(self.downloader.fetch, request, spider) + dwld = self.downloader.fetch(request, spider) dwld.addCallback(_on_success) dwld.addBoth(_on_complete) return dwld @@ -219,7 +213,6 @@ class ExecutionEngine(object): slot = Slot(start_requests or (), close_if_idle, nextcall) self.slots[spider] = slot yield self.scheduler.open_spider(spider) - self.downloader.open_spider(spider) yield self.scraper.open_spider(spider) stats.open_spider(spider) yield send_catch_log_deferred(signals.spider_opened, spider=spider) @@ -255,9 +248,6 @@ class ExecutionEngine(object): dfd = slot.close() - dfd.addBoth(lambda _: self.downloader.close_spider(spider)) - dfd.addErrback(log.err, spider=spider) - dfd.addBoth(lambda _: self.scraper.close_spider(spider)) dfd.addErrback(log.err, spider=spider) diff --git a/scrapy/settings/default_settings.py b/scrapy/settings/default_settings.py index 802a6dce8..0862e5989 100644 --- a/scrapy/settings/default_settings.py +++ b/scrapy/settings/default_settings.py @@ -26,7 +26,11 @@ CLOSESPIDER_ITEMCOUNT = 0 COMMANDS_MODULE = '' CONCURRENT_ITEMS = 100 -CONCURRENT_REQUESTS_PER_SPIDER = 8 + +CONCURRENT_REQUESTS = 16 +CONCURRENT_REQUESTS_PER_DOMAIN = 8 +CONCURRENT_REQUESTS_PER_IP = 0 + CONCURRENT_SPIDERS = 8 COOKIES_ENABLED = True @@ -136,7 +140,6 @@ EXTENSIONS_BASE = { 'scrapy.contrib.closespider.CloseSpider': 0, 'scrapy.contrib.feedexport.FeedExporter': 0, 'scrapy.contrib.spidercontext.SpiderContext': 0, - 'scrapy.contrib.throttle.AutoThrottle': 0, 'scrapy.contrib.logstats.LogStats': 0, } diff --git a/scrapy/utils/engine.py b/scrapy/utils/engine.py index 43c98cc09..9e1638d90 100644 --- a/scrapy/utils/engine.py +++ b/scrapy/utils/engine.py @@ -17,6 +17,7 @@ def get_engine_status(engine=None): "len(engine.scheduler.pending_requests)", "engine.downloader.is_idle()", "len(engine.downloader.slots)", + "len(engine.downloader.active)", "engine.scraper.is_idle()", "len(engine.scraper.slots)", ] @@ -25,10 +26,6 @@ def get_engine_status(engine=None): "engine.slots[spider].closing", "len(engine.slots[spider].inprogress)", "len(engine.scheduler.pending_requests[spider])", - "len(engine.downloader.slots[spider].queue)", - "len(engine.downloader.slots[spider].active)", - "len(engine.downloader.slots[spider].transferring)", - "engine.downloader.slots[spider].lastseen", "len(engine.scraper.slots[spider].queue)", "len(engine.scraper.slots[spider].active)", "engine.scraper.slots[spider].active_size", -- GitLab