Merge "Fix missing semaphore release around dequeue"

This commit is contained in:
Zuul 2019-02-28 17:54:57 +00:00 committed by Gerrit Code Review
commit ec5af360f2
2 changed files with 95 additions and 3 deletions

View File

@ -6422,6 +6422,88 @@ class TestSemaphore(ZuulTestCase):
self.executor_server.release()
self.waitUntilSettled()
def test_semaphore_abandon_pending_node_request(self):
"Test abandon with job semaphores and pending node request"
self.executor_server.hold_jobs_in_build = True
# Pause nodepool so we can check the ordering of getting the nodes
# and aquiring the semaphore.
self.fake_nodepool.paused = True
tenant = self.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
# The check pipeline should be empty
items = check_pipeline.getAllItems()
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.hold_jobs_in_build = False
self.fake_nodepool.paused = False
self.executor_server.release()
self.waitUntilSettled()
def test_semaphore_abandon_pending_execution(self):
"Test abandon with job semaphores and pending job execution"
# Pause the executor so it doesn't take any jobs.
self.executor_server.pause()
# Pause nodepool so we can wait on the node requests and fulfill them
# in a controlled manner.
self.fake_nodepool.paused = True
tenant = self.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.nodepool.requests), 2)
# Now unpause nodepool to fulfill the node requests. We cannot use
# waitUntilSettled here because the executor is paused.
self.fake_nodepool.paused = False
for _ in iterate_timeout(30, 'fulfill node requests'):
if len(self.nodepool.requests) == 0:
break
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
# The check pipeline should be empty
items = check_pipeline.getAllItems()
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.release()
self.waitUntilSettled()
def test_semaphore_new_patchset(self):
"Test new patchset with job semaphores"
self.executor_server.hold_jobs_in_build = True

View File

@ -420,11 +420,16 @@ class PipelineManager(object):
def cancelJobs(self, item, prime=True):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
jobs_to_release = []
old_build_set = item.current_build_set
old_jobs = {job.name: job for job in item.getJobs()}
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for req in old_build_set.node_requests.values():
self.sched.nodepool.cancelRequest(req)
jobs_to_release.append(req.job)
old_build_set.node_requests = {}
canceled_jobs = set()
for build in old_build_set.getBuilds():
@ -437,9 +442,7 @@ class PipelineManager(object):
except Exception:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
tenant = old_build_set.item.pipeline.tenant
tenant.semaphore_handler.release(
old_build_set.item, build.job)
jobs_to_release.append(build.job)
if not was_running:
nodeset = build.build_set.getJobNodeSet(build.job.name)
@ -451,6 +454,13 @@ class PipelineManager(object):
if jobname in canceled_jobs:
continue
self.sched.nodepool.returnNodeSet(nodeset)
jobs_to_release.append(old_jobs[jobname])
for job in jobs_to_release:
tenant = old_build_set.item.pipeline.tenant
tenant.semaphore_handler.release(
old_build_set.item, job)
for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %
(item_behind.change, item.change))