summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2019-02-28 17:54:57 +0000
committerGerrit Code Review <review@openstack.org>2019-02-28 17:54:57 +0000
commitec5af360f2b48a4fcdb0e70555a75796857efad9 (patch)
tree575f72a0458f91ff85f6620e051a93e4dff85870
parentbc83702789450da568f4979e746731eb9529f524 (diff)
parentf1cd9a539f01df7d586f04b7773a914a4bcf579b (diff)
Merge "Fix missing semaphore release around dequeue"
-rw-r--r--tests/unit/test_scheduler.py82
-rw-r--r--zuul/manager/__init__.py16
2 files changed, 95 insertions, 3 deletions
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index 6258398..690971e 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -6422,6 +6422,88 @@ class TestSemaphore(ZuulTestCase):
6422 self.executor_server.release() 6422 self.executor_server.release()
6423 self.waitUntilSettled() 6423 self.waitUntilSettled()
6424 6424
6425 def test_semaphore_abandon_pending_node_request(self):
6426 "Test abandon with job semaphores and pending node request"
6427 self.executor_server.hold_jobs_in_build = True
6428
6429 # Pause nodepool so we can check the ordering of getting the nodes
6430 # and aquiring the semaphore.
6431 self.fake_nodepool.paused = True
6432
6433 tenant = self.sched.abide.tenants.get('tenant-one')
6434 check_pipeline = tenant.layout.pipelines['check']
6435
6436 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
6437 self.assertFalse('test-semaphore' in
6438 tenant.semaphore_handler.semaphores)
6439
6440 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
6441 self.waitUntilSettled()
6442
6443 self.assertTrue('test-semaphore' in
6444 tenant.semaphore_handler.semaphores)
6445
6446 self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
6447 self.waitUntilSettled()
6448
6449 # The check pipeline should be empty
6450 items = check_pipeline.getAllItems()
6451 self.assertEqual(len(items), 0)
6452
6453 # The semaphore should be released
6454 self.assertFalse('test-semaphore' in
6455 tenant.semaphore_handler.semaphores)
6456
6457 self.executor_server.hold_jobs_in_build = False
6458 self.fake_nodepool.paused = False
6459 self.executor_server.release()
6460 self.waitUntilSettled()
6461
6462 def test_semaphore_abandon_pending_execution(self):
6463 "Test abandon with job semaphores and pending job execution"
6464
6465 # Pause the executor so it doesn't take any jobs.
6466 self.executor_server.pause()
6467
6468 # Pause nodepool so we can wait on the node requests and fulfill them
6469 # in a controlled manner.
6470 self.fake_nodepool.paused = True
6471
6472 tenant = self.sched.abide.tenants.get('tenant-one')
6473 check_pipeline = tenant.layout.pipelines['check']
6474
6475 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
6476 self.assertFalse('test-semaphore' in
6477 tenant.semaphore_handler.semaphores)
6478
6479 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
6480 self.waitUntilSettled()
6481 self.assertEqual(len(self.nodepool.requests), 2)
6482
6483 # Now unpause nodepool to fulfill the node requests. We cannot use
6484 # waitUntilSettled here because the executor is paused.
6485 self.fake_nodepool.paused = False
6486 for _ in iterate_timeout(30, 'fulfill node requests'):
6487 if len(self.nodepool.requests) == 0:
6488 break
6489
6490 self.assertTrue('test-semaphore' in
6491 tenant.semaphore_handler.semaphores)
6492
6493 self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
6494 self.waitUntilSettled()
6495
6496 # The check pipeline should be empty
6497 items = check_pipeline.getAllItems()
6498 self.assertEqual(len(items), 0)
6499
6500 # The semaphore should be released
6501 self.assertFalse('test-semaphore' in
6502 tenant.semaphore_handler.semaphores)
6503
6504 self.executor_server.release()
6505 self.waitUntilSettled()
6506
6425 def test_semaphore_new_patchset(self): 6507 def test_semaphore_new_patchset(self):
6426 "Test new patchset with job semaphores" 6508 "Test new patchset with job semaphores"
6427 self.executor_server.hold_jobs_in_build = True 6509 self.executor_server.hold_jobs_in_build = True
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index d3ad0d6..6b5abc1 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -420,11 +420,16 @@ class PipelineManager(object):
420 def cancelJobs(self, item, prime=True): 420 def cancelJobs(self, item, prime=True):
421 self.log.debug("Cancel jobs for change %s" % item.change) 421 self.log.debug("Cancel jobs for change %s" % item.change)
422 canceled = False 422 canceled = False
423 jobs_to_release = []
424
423 old_build_set = item.current_build_set 425 old_build_set = item.current_build_set
426 old_jobs = {job.name: job for job in item.getJobs()}
427
424 if prime and item.current_build_set.ref: 428 if prime and item.current_build_set.ref:
425 item.resetAllBuilds() 429 item.resetAllBuilds()
426 for req in old_build_set.node_requests.values(): 430 for req in old_build_set.node_requests.values():
427 self.sched.nodepool.cancelRequest(req) 431 self.sched.nodepool.cancelRequest(req)
432 jobs_to_release.append(req.job)
428 old_build_set.node_requests = {} 433 old_build_set.node_requests = {}
429 canceled_jobs = set() 434 canceled_jobs = set()
430 for build in old_build_set.getBuilds(): 435 for build in old_build_set.getBuilds():
@@ -437,9 +442,7 @@ class PipelineManager(object):
437 except Exception: 442 except Exception:
438 self.log.exception("Exception while canceling build %s " 443 self.log.exception("Exception while canceling build %s "
439 "for change %s" % (build, item.change)) 444 "for change %s" % (build, item.change))
440 tenant = old_build_set.item.pipeline.tenant 445 jobs_to_release.append(build.job)
441 tenant.semaphore_handler.release(
442 old_build_set.item, build.job)
443 446
444 if not was_running: 447 if not was_running:
445 nodeset = build.build_set.getJobNodeSet(build.job.name) 448 nodeset = build.build_set.getJobNodeSet(build.job.name)
@@ -451,6 +454,13 @@ class PipelineManager(object):
451 if jobname in canceled_jobs: 454 if jobname in canceled_jobs:
452 continue 455 continue
453 self.sched.nodepool.returnNodeSet(nodeset) 456 self.sched.nodepool.returnNodeSet(nodeset)
457 jobs_to_release.append(old_jobs[jobname])
458
459 for job in jobs_to_release:
460 tenant = old_build_set.item.pipeline.tenant
461 tenant.semaphore_handler.release(
462 old_build_set.item, job)
463
454 for item_behind in item.items_behind: 464 for item_behind in item.items_behind:
455 self.log.debug("Canceling jobs for change %s, behind change %s" % 465 self.log.debug("Canceling jobs for change %s, behind change %s" %
456 (item_behind.change, item.change)) 466 (item_behind.change, item.change))