Ensure that completed handlers are removed frequently
On a busy system it can happen that assignHandlers takes quite some time (we saw occurrences of more than 10 minutes). Within this time no node request is marked as fulfilled even if the nodes are there. A possible solution is to return from assignHandlers frequently during the iteration so we can remove completed handlers and then proceed with assigning handlers. Change-Id: I10f40504c81d532e6953d7af63c5c58fd5283573
This commit is contained in:
parent
7111fcb407
commit
9296de9bf5
|
@ -143,21 +143,26 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
# Private methods
|
||||
# ---------------------------------------------------------------
|
||||
|
||||
def _assignHandlers(self):
|
||||
def _assignHandlers(self, timeout=15):
|
||||
'''
|
||||
For each request we can grab, create a NodeRequestHandler for it.
|
||||
|
||||
The NodeRequestHandler object will kick off any threads needed to
|
||||
satisfy the request, then return. We will need to periodically poll
|
||||
the handler for completion.
|
||||
|
||||
If exceeds the timeout it stops further iteration and returns False
|
||||
in order to give us time to call _removeCompletedHandlers. Otherwise
|
||||
it returns True to signal that it is finished for now.
|
||||
'''
|
||||
start = time.monotonic()
|
||||
provider = self.getProviderConfig()
|
||||
if not provider:
|
||||
self.log.info("Missing config. Deleted provider?")
|
||||
return
|
||||
return True
|
||||
|
||||
if provider.max_concurrency == 0:
|
||||
return
|
||||
return True
|
||||
|
||||
# Sort requests by queue priority, then, for all requests at
|
||||
# the same priority, use the relative_priority field to
|
||||
|
@ -168,8 +173,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
r.id.split('-')[1]))
|
||||
|
||||
for req in requests:
|
||||
if not self.running:
|
||||
return True
|
||||
|
||||
if self.paused_handler:
|
||||
return
|
||||
return True
|
||||
|
||||
# Get active threads for all pools for this provider
|
||||
active_threads = sum([
|
||||
|
@ -183,7 +191,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
self.log.debug("Request handling limited: %s active threads ",
|
||||
"with max concurrency of %s",
|
||||
active_threads, provider.max_concurrency)
|
||||
return
|
||||
return True
|
||||
|
||||
req = self.zk.getNodeRequest(req.id)
|
||||
if not req:
|
||||
|
@ -217,6 +225,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
self.paused_handler = rh
|
||||
self.request_handlers.append(rh)
|
||||
|
||||
# if we exceeded the timeout stop iterating here
|
||||
if time.monotonic() - start > timeout:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _removeCompletedHandlers(self):
|
||||
'''
|
||||
Poll handlers to see which have completed.
|
||||
|
@ -305,7 +318,12 @@ class PoolWorker(threading.Thread, stats.StatsReporter):
|
|||
|
||||
try:
|
||||
if not self.paused_handler:
|
||||
self._assignHandlers()
|
||||
while not self._assignHandlers():
|
||||
# _assignHandlers can take quite some time on a busy
|
||||
# system so sprinkle _removeCompletedHandlers in
|
||||
# between such that we have a chance to fulfill
|
||||
# requests that already have all nodes.
|
||||
self._removeCompletedHandlers()
|
||||
else:
|
||||
# If we are paused, one request handler could not
|
||||
# satisfy its assigned request, so give it
|
||||
|
|
Loading…
Reference in New Issue