Merge "Be more aggressive in canceling node requests"
This commit is contained in:
commit
e15926faca
|
@ -1861,9 +1861,8 @@ class BuildSet(object):
|
|||
return self.node_requests.get(job_name)
|
||||
|
||||
def removeJobNodeRequest(self, job_name):
|
||||
if job_name not in self.node_requests:
|
||||
raise Exception("No node request for %s" % (job_name))
|
||||
del self.node_requests[job_name]
|
||||
if job_name in self.node_requests:
|
||||
del self.node_requests[job_name]
|
||||
|
||||
def jobNodeRequestComplete(self, job_name, req, nodeset):
|
||||
if job_name in self.nodesets:
|
||||
|
|
|
@ -770,6 +770,7 @@ class Scheduler(threading.Thread):
|
|||
new_pipeline.window_floor)
|
||||
items_to_remove = []
|
||||
builds_to_cancel = []
|
||||
requests_to_cancel = []
|
||||
last_head = None
|
||||
for shared_queue in old_pipeline.queues:
|
||||
# Attempt to keep window sizes from shrinking where possible
|
||||
|
@ -812,15 +813,25 @@ class Scheduler(threading.Thread):
|
|||
else:
|
||||
item.removeBuild(build)
|
||||
builds_to_cancel.append(build)
|
||||
for request_job, request in \
|
||||
item.current_build_set.node_requests.items():
|
||||
new_job = item.getJob(request_job)
|
||||
if not new_job:
|
||||
requests_to_cancel.append(
|
||||
(item.current_build_set, request))
|
||||
else:
|
||||
items_to_remove.append(item)
|
||||
for item in items_to_remove:
|
||||
self.log.warning(
|
||||
self.log.info(
|
||||
"Removing item %s during reconfiguration" % (item,))
|
||||
for build in item.current_build_set.getBuilds():
|
||||
builds_to_cancel.append(build)
|
||||
for request_job, request in \
|
||||
item.current_build_set.node_requests.items():
|
||||
requests_to_cancel.append(
|
||||
(item.current_build_set, request))
|
||||
for build in builds_to_cancel:
|
||||
self.log.warning(
|
||||
self.log.info(
|
||||
"Canceling build %s during reconfiguration" % (build,))
|
||||
try:
|
||||
self.executor.cancel(build)
|
||||
|
@ -839,6 +850,12 @@ class Scheduler(threading.Thread):
|
|||
"for change %s" % (build, build.build_set.item.change))
|
||||
tenant.semaphore_handler.release(
|
||||
build.build_set.item, build.job)
|
||||
for build_set, request in requests_to_cancel:
|
||||
self.log.info(
|
||||
"Canceling node request %s during reconfiguration",
|
||||
request)
|
||||
self.nodepool.cancelRequest(request)
|
||||
build_set.removeJobNodeRequest(request.job.name)
|
||||
|
||||
def _reconfigureTenant(self, tenant):
|
||||
# This is called from _doReconfigureEvent while holding the
|
||||
|
@ -1313,12 +1330,7 @@ class Scheduler(threading.Thread):
|
|||
self.log.warning("Item %s does not contain job %s "
|
||||
"for node request %s",
|
||||
build_set.item, request.job.name, request)
|
||||
try:
|
||||
build_set.removeJobNodeRequest(request.job.name)
|
||||
except Exception:
|
||||
self.log.exception("Unable to remove obsolete node request "
|
||||
"%s for %s job %s",
|
||||
request, build_set.item, request.job.name)
|
||||
build_set.removeJobNodeRequest(request.job.name)
|
||||
if request.fulfilled:
|
||||
self.nodepool.returnNodeSet(request.nodeset)
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue