Merge "Fix items stuck in queue pending node requests"
This commit is contained in:
commit
c1006610fd
|
@ -4484,7 +4484,7 @@ class TestScheduler(ZuulTestCase):
|
|||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
self.assertEqual(queue.window, 2)
|
||||
self.assertTrue(len(self.builds), 4)
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.commitConfigUpdate('org/common-config',
|
||||
|
@ -4499,7 +4499,7 @@ class TestScheduler(ZuulTestCase):
|
|||
# B is outside the window, but still marked active until the
|
||||
# next pass through the queue processor, so its builds haven't
|
||||
# been canceled.
|
||||
self.assertTrue(len(self.builds), 4)
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
|
||||
self.sched.reconfigure(self.config)
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
|
@ -4507,7 +4507,7 @@ class TestScheduler(ZuulTestCase):
|
|||
self.assertEqual(queue.window, 1)
|
||||
self.waitUntilSettled()
|
||||
# B's builds have been canceled now
|
||||
self.assertTrue(len(self.builds), 2)
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
|
@ -4524,6 +4524,75 @@ class TestScheduler(ZuulTestCase):
|
|||
dict(name='job2', result='SUCCESS', changes='1,1 2,1'),
|
||||
], ordered=False)
|
||||
|
||||
@simple_layout('layouts/reconfigure-window-fixed.yaml')
|
||||
def test_reconfigure_window_fixed_requests(self):
|
||||
# Test the active window shrinking during reconfiguration with
|
||||
# outstanding node requests
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Start the jobs for A
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("A complete")
|
||||
|
||||
# Hold node requests for B
|
||||
self.fake_nodepool.pause()
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
B.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("B complete")
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
self.assertEqual(queue.window, 2)
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.commitConfigUpdate('org/common-config',
|
||||
'layouts/reconfigure-window-fixed2.yaml')
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("Reconfiguration complete")
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
# Because we have configured a static window, it should
|
||||
# be allowed to shrink on reconfiguration.
|
||||
self.assertEqual(queue.window, 1)
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
|
||||
# After the previous reconfig, the queue processor will have
|
||||
# run and marked B inactive; run another reconfiguration so
|
||||
# that we're testing what happens when we reconfigure after
|
||||
# the active window having shrunk.
|
||||
self.sched.reconfigure(self.config)
|
||||
|
||||
# Unpause the node requests now
|
||||
self.fake_nodepool.unpause()
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("Nodepool unpause complete")
|
||||
|
||||
# Allow A to merge and B to enter the active window and complete
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.log.debug("Executor unpause complete")
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
queue = tenant.layout.pipelines['gate'].queues[0]
|
||||
self.assertEqual(queue.window, 1)
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([
|
||||
dict(name='job1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='job2', result='SUCCESS', changes='1,1'),
|
||||
dict(name='job1', result='SUCCESS', changes='1,1 2,1'),
|
||||
dict(name='job2', result='SUCCESS', changes='1,1 2,1'),
|
||||
], ordered=False)
|
||||
|
||||
@simple_layout('layouts/reconfigure-remove-add.yaml')
|
||||
def test_reconfigure_remove_add(self):
|
||||
# Test removing, then adding a job while in queue
|
||||
|
|
|
@ -1849,7 +1849,7 @@ class BuildSet(object):
|
|||
|
||||
def removeJobNodeSet(self, job_name):
|
||||
if job_name not in self.nodesets:
|
||||
raise Exception("No job set for %s" % (job_name))
|
||||
raise Exception("No job nodeset for %s" % (job_name))
|
||||
del self.nodesets[job_name]
|
||||
|
||||
def setJobNodeRequest(self, job_name, req):
|
||||
|
@ -1860,6 +1860,11 @@ class BuildSet(object):
|
|||
def getJobNodeRequest(self, job_name):
|
||||
return self.node_requests.get(job_name)
|
||||
|
||||
def removeJobNodeRequest(self, job_name):
|
||||
if job_name not in self.node_requests:
|
||||
raise Exception("No node request for %s" % (job_name))
|
||||
del self.node_requests[job_name]
|
||||
|
||||
def jobNodeRequestComplete(self, job_name, req, nodeset):
|
||||
if job_name in self.nodesets:
|
||||
raise Exception("Prior node request for %s" % (job_name))
|
||||
|
|
|
@ -1313,6 +1313,12 @@ class Scheduler(threading.Thread):
|
|||
self.log.warning("Item %s does not contain job %s "
|
||||
"for node request %s",
|
||||
build_set.item, request.job.name, request)
|
||||
try:
|
||||
build_set.removeJobNodeRequest(request.job.name)
|
||||
except Exception:
|
||||
self.log.exception("Unable to remove obsolete node request "
|
||||
"%s for %s job %s",
|
||||
request, build_set.item, request.job.name)
|
||||
if request.fulfilled:
|
||||
self.nodepool.returnNodeSet(request.nodeset)
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue