Fix items stuck in queue pending node requests
Given the following sequence: * An item is within the active window * A node request is submitted for a job for that item * The active window shrinks * A tenant reconfiguration is performed * The node request completes The nodes would be returned, however the record of the node request was not removed from the item's buildset. Once the item entered the active window again, no further node requests would happen for that job because of the retained node request record, and therefore the item would be stuck in the queue. To correct this, discard the node request record on the buildset if we are going to return the nodes unused in that case. Also clarify a debug message "job set" -> "job nodeset", and correct some typos from a related unit test which were inadvertently simply asserting true. Change-Id: Ia6259b08ecda49f3123dae1c7d0e35dc46004561
This commit is contained in:
parent
3ddce68728
commit
31a7ddc791
|
@ -4484,7 +4484,7 @@ class TestScheduler(ZuulTestCase):
|
|||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
self.assertEqual(queue.window, 2)
|
||||
self.assertTrue(len(self.builds), 4)
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.commitConfigUpdate('org/common-config',
|
||||
|
@ -4499,7 +4499,7 @@ class TestScheduler(ZuulTestCase):
|
|||
# B is outside the window, but still marked active until the
|
||||
# next pass through the queue processor, so its builds haven't
|
||||
# been canceled.
|
||||
self.assertTrue(len(self.builds), 4)
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
|
||||
self.sched.reconfigure(self.config)
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
|
@ -4507,7 +4507,7 @@ class TestScheduler(ZuulTestCase):
|
|||
self.assertEqual(queue.window, 1)
|
||||
self.waitUntilSettled()
|
||||
# B's builds have been canceled now
|
||||
self.assertTrue(len(self.builds), 2)
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
|
@ -4524,6 +4524,75 @@ class TestScheduler(ZuulTestCase):
|
|||
dict(name='job2', result='SUCCESS', changes='1,1 2,1'),
|
||||
], ordered=False)
|
||||
|
||||
@simple_layout('layouts/reconfigure-window-fixed.yaml')
|
||||
def test_reconfigure_window_fixed_requests(self):
|
||||
# Test the active window shrinking during reconfiguration with
|
||||
# outstanding node requests
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Start the jobs for A
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("A complete")
|
||||
|
||||
# Hold node requests for B
|
||||
self.fake_nodepool.pause()
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
B.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("B complete")
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
self.assertEqual(queue.window, 2)
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.commitConfigUpdate('org/common-config',
|
||||
'layouts/reconfigure-window-fixed2.yaml')
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("Reconfiguration complete")
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
# Because we have configured a static window, it should
|
||||
# be allowed to shrink on reconfiguration.
|
||||
self.assertEqual(queue.window, 1)
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
# After the previous reconfig, the queue processor will have
|
||||
# run and marked B inactive; run another reconfiguration so
|
||||
# that we're testing what happens when we reconfigure after
|
||||
# the active window having shrunk.
|
||||
self.sched.reconfigure(self.config)
|
||||
|
||||
# Unpause the node requests now
|
||||
self.fake_nodepool.unpause()
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("Nodepool unpause complete")
|
||||
|
||||
# Allow A to merge and B to enter the active window and complete
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("Executor unpause complete")
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
self.assertEqual(queue.window, 1)
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([
|
||||
dict(name='job1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='job2', result='SUCCESS', changes='1,1'),
|
||||
dict(name='job1', result='SUCCESS', changes='1,1 2,1'),
|
||||
dict(name='job2', result='SUCCESS', changes='1,1 2,1'),
|
||||
], ordered=False)
|
||||
|
||||
@simple_layout('layouts/reconfigure-remove-add.yaml')
|
||||
def test_reconfigure_remove_add(self):
|
||||
# Test removing, then adding a job while in queue
|
||||
|
|
|
@ -1849,7 +1849,7 @@ class BuildSet(object):
|
|||
|
||||
def removeJobNodeSet(self, job_name):
|
||||
if job_name not in self.nodesets:
|
||||
raise Exception("No job set for %s" % (job_name))
|
||||
raise Exception("No job nodeset for %s" % (job_name))
|
||||
del self.nodesets[job_name]
|
||||
|
||||
def setJobNodeRequest(self, job_name, req):
|
||||
|
@ -1860,6 +1860,11 @@ class BuildSet(object):
|
|||
def getJobNodeRequest(self, job_name):
|
||||
return self.node_requests.get(job_name)
|
||||
|
||||
def removeJobNodeRequest(self, job_name):
|
||||
if job_name not in self.node_requests:
|
||||
raise Exception("No node request for %s" % (job_name))
|
||||
del self.node_requests[job_name]
|
||||
|
||||
def jobNodeRequestComplete(self, job_name, req, nodeset):
|
||||
if job_name in self.nodesets:
|
||||
raise Exception("Prior node request for %s" % (job_name))
|
||||
|
|
|
@ -1313,6 +1313,12 @@ class Scheduler(threading.Thread):
|
|||
self.log.warning("Item %s does not contain job %s "
|
||||
"for node request %s",
|
||||
build_set.item, request.job.name, request)
|
||||
try:
|
||||
build_set.removeJobNodeRequest(request.job.name)
|
||||
except Exception:
|
||||
self.log.exception("Unable to remove obsolete node request "
|
||||
"%s for %s job %s",
|
||||
request, build_set.item, request.job.name)
|
||||
if request.fulfilled:
|
||||
self.nodepool.returnNodeSet(request.nodeset)
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue