Ensure that completed handlers are removed frequently

On a busy system it can happen that assignHandlers takes quite some
time (we saw occurrences of more than 10 minutes). Within this time no
node request is marked as fulfilled even if the nodes are there. A
possible solution is to return from assignHandlers frequently during
the iteration so we can remove completed handlers and then proceed
with assigning handlers.

Change-Id: I10f40504c81d532e6953d7af63c5c58fd5283573
This commit is contained in:
Tobias Henkel 2018-10-12 15:04:19 +02:00
parent 7111fcb407
commit 9296de9bf5
1 changed files with 24 additions and 6 deletions

View File

@ -143,21 +143,26 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
# Private methods
# ---------------------------------------------------------------
def _assignHandlers(self):
def _assignHandlers(self, timeout=15):
'''
For each request we can grab, create a NodeRequestHandler for it.
The NodeRequestHandler object will kick off any threads needed to
satisfy the request, then return. We will need to periodically poll
the handler for completion.
If exceeds the timeout it stops further iteration and returns False
in order to give us time to call _removeCompletedHandlers. Otherwise
it returns True to signal that it is finished for now.
'''
start = time.monotonic()
provider = self.getProviderConfig()
if not provider:
self.log.info("Missing config. Deleted provider?")
return
return True
if provider.max_concurrency == 0:
return
return True
# Sort requests by queue priority, then, for all requests at
# the same priority, use the relative_priority field to
@ -168,8 +173,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
r.id.split('-')[1]))
for req in requests:
if not self.running:
return True
if self.paused_handler:
return
return True
# Get active threads for all pools for this provider
active_threads = sum([
@ -183,7 +191,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.log.debug("Request handling limited: %s active threads ",
"with max concurrency of %s",
active_threads, provider.max_concurrency)
return
return True
req = self.zk.getNodeRequest(req.id)
if not req:
@ -217,6 +225,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
self.paused_handler = rh
self.request_handlers.append(rh)
# if we exceeded the timeout stop iterating here
if time.monotonic() - start > timeout:
return False
return True
def _removeCompletedHandlers(self):
'''
Poll handlers to see which have completed.
@ -305,7 +318,12 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
try:
if not self.paused_handler:
self._assignHandlers()
while not self._assignHandlers():
# _assignHandlers can take quite some time on a busy
# system so sprinkle _removeCompletedHandlers in
# between such that we have a chance to fulfill
# requests that already have all nodes.
self._removeCompletedHandlers()
else:
# If we are paused, one request handler could not
# satisfy its assigned request, so give it