提交 ce7a7879 编写于 作者: P Pablo Hoffman

Big downloader refactoring to support real concurrency limits per domain/ip,

instead of global limits per spider which were a bit useless.

This removes the setting CONCURRENT_REQUESTS_PER_SPIDER and adds thre new
settings:

* CONCURRENT_REQUESTS
* CONCURRENT_REQUESTS_PER_DOMAIN
* CONCURRENT_REQUESTS_PER_IP (overrides per domain)

The AutoThrottle extension had to be disabled, but will be ported and
re-enabled soon.
上级 a45dca32
......@@ -242,15 +242,39 @@ Default: ``100``
Maximum number of concurrent items (per response) to process in parallel in the
Item Processor (also known as the :ref:`Item Pipeline <topics-item-pipeline>`).
.. setting:: CONCURRENT_REQUESTS_PER_SPIDER
.. setting:: CONCURRENT_REQUESTS
CONCURRENT_REQUESTS_PER_SPIDER
CONCURRENT_REQUESTS
-------------------
Default: ``16``
The maximum number of concurrent (ie. simultaneous) requests that will be
performed by the Scrapy downloader.
.. setting:: CONCURRENT_REQUESTS_PER_DOMAIN
CONCURRENT_REQUESTS_PER_DOMAIN
------------------------------
Default: ``8``
Specifies how many concurrent (ie. simultaneous) requests will be performed per
open spider.
The maximum number of concurrent (ie. simultaneous) requests that will be
performed to any single domain.
.. setting:: CONCURRENT_REQUESTS_PER_IP
CONCURRENT_REQUESTS_PER_IP
--------------------------
Default: ``0``
The maximum number of concurrent (ie. simultaneous) requests that will be
performed to any single IP. If non-zero, the
:setting:`CONCURRENT_REQUESTS_PER_DOMAIN` setting is ignored, and this one is
used instead. In other words, concurrency limits will be applied per IP, not
per domain.
.. setting:: CONCURRENT_SPIDERS
......@@ -548,7 +572,6 @@ Default::
'scrapy.contrib.closespider.CloseSpider': 0,
'scrapy.contrib.feedexport.FeedExporter': 0,
'scrapy.contrib.spidercontext.SpiderContext': 0,
'scrapy.contrib.throttle.AutoThrottle': 0,
'scrapy.contrib.logstats.LogStats': 0,
}
......
# TODO: this extension is currently broken and needs to be ported after the
# downloader refactoring introduced in r2732
from scrapy.xlib.pydispatch import dispatcher
from scrapy.utils.python import setattr_default
from scrapy.conf import settings
......
import socket
import random
import warnings
from time import time
from collections import deque
from functools import partial
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
from scrapy.conf import settings
from scrapy.utils.python import setattr_default
from scrapy.utils.defer import mustbe_deferred
from scrapy.utils.signal import send_catch_log
from scrapy.utils.reactor import CallLaterOnce
from scrapy.utils.httpobj import urlparse_cached
from scrapy.resolver import gethostbyname
from scrapy import signals
from scrapy import log
from .middleware import DownloaderMiddlewareManager
from .handlers import DownloadHandlers
class Slot(object):
"""Downloader spider slot"""
def __init__(self, spider):
setattr_default(spider, 'download_delay', spider.settings.getfloat('DOWNLOAD_DELAY'))
setattr_default(spider, 'randomize_download_delay', spider.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY'))
setattr_default(spider, 'max_concurrent_requests', spider.settings.getint('CONCURRENT_REQUESTS_PER_SPIDER'))
if spider.download_delay > 0 and spider.max_concurrent_requests > 1:
spider.max_concurrent_requests = 1
msg = "Setting max_concurrent_requests=1 because of download_delay=%s" % spider.download_delay
log.msg(msg, spider=spider)
self.spider = spider
"""Downloader slot"""
def __init__(self, concurrency, settings):
self.concurrency = concurrency
self.delay = settings.getfloat('DOWNLOAD_DELAY')
self.randomize_delay = settings.getbool('RANDOMIZE_DOWNLOAD_DELAY')
self.active = set()
self.queue = deque()
self.transferring = set()
self.lastseen = 0
self.next_request_calls = set()
def free_transfer_slots(self):
return self.spider.max_concurrent_requests - len(self.transferring)
def needs_backout(self):
# use self.active to include requests in the downloader middleware
return len(self.active) > 2 * self.spider.max_concurrent_requests
return self.concurrency - len(self.transferring)
def download_delay(self):
delay = self.spider.download_delay
if self.spider.randomize_download_delay:
delay = random.uniform(0.5*delay, 1.5*delay)
return delay
def cancel_request_calls(self):
for call in self.next_request_calls:
call.cancel()
self.next_request_calls.clear()
if self.randomize_delay:
return random.uniform(0.5*self.delay, 1.5*self.delay)
return self.delay
class Downloader(object):
def __init__(self):
def __init__(self, settings):
self.settings = settings
self.slots = {}
self.active = set()
self.handlers = DownloadHandlers()
self.total_concurrency = settings.getint('CONCURRENT_REQUESTS')
self.domain_concurrency = settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN')
self.ip_concurrency = settings.getint('CONCURRENT_REQUESTS_PER_IP')
self.middleware = DownloaderMiddlewareManager.from_settings(settings)
# TODO: remove for Scrapy 0.15
c = settings.getint('CONCURRENT_REQUESTS_PER_SPIDER')
if c:
warnings.warn("CONCURRENT_REQUESTS_PER_SPIDER setting is deprecated, use CONCURRENT_REQUESTS_PER_DOMAIN instead", DeprecationWarning)
self.domain_concurrency = c
def fetch(self, request, spider):
slot = self.slots[spider]
key, slot = self._get_slot(request)
self.active.add(request)
slot.active.add(request)
def _deactivate(response):
self.active.remove(request)
slot.active.remove(request)
if not slot.active: # remove empty slots
del self.slots[key]
return response
dfd = self.middleware.download(self._enqueue_request, request, spider)
dlfunc = partial(self._enqueue_request, slot=slot)
dfd = self.middleware.download(dlfunc, request, spider)
return dfd.addBoth(_deactivate)
def _enqueue_request(self, request, spider):
slot = self.slots[spider]
def needs_backout(self):
return len(self.active) >= self.total_concurrency
def _get_slot(self, request):
key = urlparse_cached(request).hostname
if self.ip_concurrency:
concurrency = self.ip_concurrency
try:
key = gethostbyname(key)
except socket.error: # resolution error
pass
else:
concurrency = self.domain_concurrency
if key not in self.slots:
self.slots[key] = Slot(concurrency, self.settings)
return key, self.slots[key]
def _enqueue_request(self, request, spider, slot):
def _downloaded(response):
send_catch_log(signal=signals.response_downloaded, \
response=response, request=request, spider=spider)
......@@ -91,9 +111,7 @@ class Downloader(object):
if penalty > 0 and slot.free_transfer_slots():
d = defer.Deferred()
d.addCallback(self._process_queue)
call = reactor.callLater(penalty, d.callback, spider, slot)
slot.next_request_calls.add(call)
d.addBoth(lambda x: slot.next_request_calls.remove(call))
reactor.callLater(penalty, d.callback, spider, slot)
return
slot.lastseen = now
......@@ -120,15 +138,6 @@ class Downloader(object):
return _
return dfd.addBoth(finish_transferring)
def open_spider(self, spider):
assert spider not in self.slots, "Spider already opened: %s" % spider
self.slots[spider] = Slot(spider)
def close_spider(self, spider):
assert spider in self.slots, "Spider not opened: %s" % spider
slot = self.slots.pop(spider)
slot.cancel_request_calls()
def is_idle(self):
return not self.slots
......@@ -57,7 +57,7 @@ class ExecutionEngine(object):
self.running = False
self.paused = False
self.scheduler = load_object(settings['SCHEDULER'])()
self.downloader = Downloader()
self.downloader = Downloader(self.settings)
self.scraper = Scraper(self, self.settings)
self._concurrent_spiders = settings.getint('CONCURRENT_SPIDERS')
self._spider_closed_callback = spider_closed_callback
......@@ -117,8 +117,7 @@ class ExecutionEngine(object):
slot = self.slots[spider]
return not self.running \
or slot.closing \
or self.spider_is_closed(spider) \
or self.downloader.slots[spider].needs_backout() \
or self.downloader.needs_backout() \
or self.scraper.slots[spider].needs_backout()
def _next_request_from_scheduler(self, spider):
......@@ -150,14 +149,9 @@ class ExecutionEngine(object):
scraper_idle = spider in self.scraper.slots \
and self.scraper.slots[spider].is_idle()
pending = self.scheduler.spider_has_pending_requests(spider)
downloading = spider in self.downloader.slots \
and self.downloader.slots[spider].active
return scraper_idle and not (pending or downloading)
def spider_is_closed(self, spider):
"""Return True if the spider is fully closed (ie. not even in the
closing stage)"""
return spider not in self.downloader.slots
downloading = bool(self.downloader.slots)
idle = scraper_idle and not (pending or downloading)
return idle
@property
def open_spiders(self):
......@@ -205,7 +199,7 @@ class ExecutionEngine(object):
slot.nextcall.schedule()
return _
dwld = mustbe_deferred(self.downloader.fetch, request, spider)
dwld = self.downloader.fetch(request, spider)
dwld.addCallback(_on_success)
dwld.addBoth(_on_complete)
return dwld
......@@ -219,7 +213,6 @@ class ExecutionEngine(object):
slot = Slot(start_requests or (), close_if_idle, nextcall)
self.slots[spider] = slot
yield self.scheduler.open_spider(spider)
self.downloader.open_spider(spider)
yield self.scraper.open_spider(spider)
stats.open_spider(spider)
yield send_catch_log_deferred(signals.spider_opened, spider=spider)
......@@ -255,9 +248,6 @@ class ExecutionEngine(object):
dfd = slot.close()
dfd.addBoth(lambda _: self.downloader.close_spider(spider))
dfd.addErrback(log.err, spider=spider)
dfd.addBoth(lambda _: self.scraper.close_spider(spider))
dfd.addErrback(log.err, spider=spider)
......
......@@ -26,7 +26,11 @@ CLOSESPIDER_ITEMCOUNT = 0
COMMANDS_MODULE = ''
CONCURRENT_ITEMS = 100
CONCURRENT_REQUESTS_PER_SPIDER = 8
CONCURRENT_REQUESTS = 16
CONCURRENT_REQUESTS_PER_DOMAIN = 8
CONCURRENT_REQUESTS_PER_IP = 0
CONCURRENT_SPIDERS = 8
COOKIES_ENABLED = True
......@@ -136,7 +140,6 @@ EXTENSIONS_BASE = {
'scrapy.contrib.closespider.CloseSpider': 0,
'scrapy.contrib.feedexport.FeedExporter': 0,
'scrapy.contrib.spidercontext.SpiderContext': 0,
'scrapy.contrib.throttle.AutoThrottle': 0,
'scrapy.contrib.logstats.LogStats': 0,
}
......
......@@ -17,6 +17,7 @@ def get_engine_status(engine=None):
"len(engine.scheduler.pending_requests)",
"engine.downloader.is_idle()",
"len(engine.downloader.slots)",
"len(engine.downloader.active)",
"engine.scraper.is_idle()",
"len(engine.scraper.slots)",
]
......@@ -25,10 +26,6 @@ def get_engine_status(engine=None):
"engine.slots[spider].closing",
"len(engine.slots[spider].inprogress)",
"len(engine.scheduler.pending_requests[spider])",
"len(engine.downloader.slots[spider].queue)",
"len(engine.downloader.slots[spider].active)",
"len(engine.downloader.slots[spider].transferring)",
"engine.downloader.slots[spider].lastseen",
"len(engine.scraper.slots[spider].queue)",
"len(engine.scraper.slots[spider].active)",
"engine.scraper.slots[spider].active_size",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册