Merge "Fix missing semaphore release around dequeue"
This commit is contained in:
commit
ec5af360f2
|
@ -6422,6 +6422,88 @@ class TestSemaphore(ZuulTestCase):
|
|||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_semaphore_abandon_pending_node_request(self):
|
||||
"Test abandon with job semaphores and pending node request"
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Pause nodepool so we can check the ordering of getting the nodes
|
||||
# and aquiring the semaphore.
|
||||
self.fake_nodepool.paused = True
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
check_pipeline = tenant.layout.pipelines['check']
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
# The check pipeline should be empty
|
||||
items = check_pipeline.getAllItems()
|
||||
self.assertEqual(len(items), 0)
|
||||
|
||||
# The semaphore should be released
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.fake_nodepool.paused = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_semaphore_abandon_pending_execution(self):
|
||||
"Test abandon with job semaphores and pending job execution"
|
||||
|
||||
# Pause the executor so it doesn't take any jobs.
|
||||
self.executor_server.pause()
|
||||
|
||||
# Pause nodepool so we can wait on the node requests and fulfill them
|
||||
# in a controlled manner.
|
||||
self.fake_nodepool.paused = True
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
check_pipeline = tenant.layout.pipelines['check']
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(len(self.nodepool.requests), 2)
|
||||
|
||||
# Now unpause nodepool to fulfill the node requests. We cannot use
|
||||
# waitUntilSettled here because the executor is paused.
|
||||
self.fake_nodepool.paused = False
|
||||
for _ in iterate_timeout(30, 'fulfill node requests'):
|
||||
if len(self.nodepool.requests) == 0:
|
||||
break
|
||||
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
# The check pipeline should be empty
|
||||
items = check_pipeline.getAllItems()
|
||||
self.assertEqual(len(items), 0)
|
||||
|
||||
# The semaphore should be released
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_semaphore_new_patchset(self):
|
||||
"Test new patchset with job semaphores"
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
|
|
@ -420,11 +420,16 @@ class PipelineManager(object):
|
|||
def cancelJobs(self, item, prime=True):
|
||||
self.log.debug("Cancel jobs for change %s" % item.change)
|
||||
canceled = False
|
||||
jobs_to_release = []
|
||||
|
||||
old_build_set = item.current_build_set
|
||||
old_jobs = {job.name: job for job in item.getJobs()}
|
||||
|
||||
if prime and item.current_build_set.ref:
|
||||
item.resetAllBuilds()
|
||||
for req in old_build_set.node_requests.values():
|
||||
self.sched.nodepool.cancelRequest(req)
|
||||
jobs_to_release.append(req.job)
|
||||
old_build_set.node_requests = {}
|
||||
canceled_jobs = set()
|
||||
for build in old_build_set.getBuilds():
|
||||
|
@ -437,9 +442,7 @@ class PipelineManager(object):
|
|||
except Exception:
|
||||
self.log.exception("Exception while canceling build %s "
|
||||
"for change %s" % (build, item.change))
|
||||
tenant = old_build_set.item.pipeline.tenant
|
||||
tenant.semaphore_handler.release(
|
||||
old_build_set.item, build.job)
|
||||
jobs_to_release.append(build.job)
|
||||
|
||||
if not was_running:
|
||||
nodeset = build.build_set.getJobNodeSet(build.job.name)
|
||||
|
@ -451,6 +454,13 @@ class PipelineManager(object):
|
|||
if jobname in canceled_jobs:
|
||||
continue
|
||||
self.sched.nodepool.returnNodeSet(nodeset)
|
||||
jobs_to_release.append(old_jobs[jobname])
|
||||
|
||||
for job in jobs_to_release:
|
||||
tenant = old_build_set.item.pipeline.tenant
|
||||
tenant.semaphore_handler.release(
|
||||
old_build_set.item, job)
|
||||
|
||||
for item_behind in item.items_behind:
|
||||
self.log.debug("Canceling jobs for change %s, behind change %s" %
|
||||
(item_behind.change, item.change))
|
||||
|
|
Loading…
Reference in New Issue