Merge "Ensure that completed handlers are removed frequently"
This commit is contained in:
commit
9758a4be44
|
@ -143,21 +143,26 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
# Private methods
|
||||
# ---------------------------------------------------------------
|
||||
|
||||
def _assignHandlers(self):
|
||||
def _assignHandlers(self, timeout=15):
|
||||
'''
|
||||
For each request we can grab, create a NodeRequestHandler for it.
|
||||
|
||||
The NodeRequestHandler object will kick off any threads needed to
|
||||
satisfy the request, then return. We will need to periodically poll
|
||||
the handler for completion.
|
||||
|
||||
If exceeds the timeout it stops further iteration and returns False
|
||||
in order to give us time to call _removeCompletedHandlers. Otherwise
|
||||
it returns True to signal that it is finished for now.
|
||||
'''
|
||||
start = time.monotonic()
|
||||
provider = self.getProviderConfig()
|
||||
if not provider:
|
||||
self.log.info("Missing config. Deleted provider?")
|
||||
return
|
||||
return True
|
||||
|
||||
if provider.max_concurrency == 0:
|
||||
return
|
||||
return True
|
||||
|
||||
# Sort requests by queue priority, then, for all requests at
|
||||
# the same priority, use the relative_priority field to
|
||||
|
@ -168,8 +173,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
r.id.split('-')[1]))
|
||||
|
||||
for req in requests:
|
||||
if not self.running:
|
||||
return True
|
||||
|
||||
if self.paused_handler:
|
||||
return
|
||||
return True
|
||||
|
||||
# Get active threads for all pools for this provider
|
||||
active_threads = sum([
|
||||
|
@ -183,7 +191,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
self.log.debug("Request handling limited: %s active threads ",
|
||||
"with max concurrency of %s",
|
||||
active_threads, provider.max_concurrency)
|
||||
return
|
||||
return True
|
||||
|
||||
req = self.zk.getNodeRequest(req.id)
|
||||
if not req:
|
||||
|
@ -217,6 +225,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
self.paused_handler = rh
|
||||
self.request_handlers.append(rh)
|
||||
|
||||
# if we exceeded the timeout stop iterating here
|
||||
if time.monotonic() - start > timeout:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _removeCompletedHandlers(self):
|
||||
'''
|
||||
Poll handlers to see which have completed.
|
||||
|
@ -305,7 +318,12 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
|
||||
try:
|
||||
if not self.paused_handler:
|
||||
self._assignHandlers()
|
||||
while not self._assignHandlers():
|
||||
# _assignHandlers can take quite some time on a busy
|
||||
# system so sprinkle _removeCompletedHandlers in
|
||||
# between such that we have a chance to fulfill
|
||||
# requests that already have all nodes.
|
||||
self._removeCompletedHandlers()
|
||||
else:
|
||||
# If we are paused, one request handler could not
|
||||
# satisfy its assigned request, so give it
|
||||
|
|
Loading…
Reference in New Issue