summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-11-30 15:24:56 +0000
committerGerrit Code Review <review@openstack.org>2018-11-30 15:24:56 +0000
commit16c55fa2670d838411e65409a95a9e505c324654 (patch)
tree964f8839e58dd8ecf089683b742c1f7bdc432036
parent51ede31e84335aa3acccacdd636b44396afccea8 (diff)
parent0b00c4685b1883dcb6d9ac1814acc68c89eeae26 (diff)
Merge "Set relative priority of node requests"
-rw-r--r--doc/source/admin/components.rst26
-rw-r--r--releasenotes/notes/relative_priority-dee014da5977da36.yaml6
-rw-r--r--tests/base.py15
-rw-r--r--tests/fixtures/layouts/two-projects-integrated.yaml79
-rw-r--r--tests/fixtures/zuul.conf1
-rw-r--r--tests/nodepool/test_nodepool_integration.py8
-rw-r--r--tests/unit/test_nodepool.py34
-rw-r--r--tests/unit/test_scheduler.py113
-rw-r--r--zuul/manager/__init__.py17
-rw-r--r--zuul/manager/dependent.py5
-rw-r--r--zuul/model.py21
-rw-r--r--zuul/nodepool.py38
-rw-r--r--zuul/scheduler.py4
-rw-r--r--zuul/zk.py106
14 files changed, 427 insertions, 46 deletions
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 81a20cb..cf61b75 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -276,6 +276,32 @@ The following sections of ``zuul.conf`` are used by the scheduler:
276 276
277 Path to directory in which Zuul should save its state. 277 Path to directory in which Zuul should save its state.
278 278
279 .. attr:: relative_priority
280 :default: False
281
282 A boolean which indicates whether the scheduler should supply
283 relative priority information for node requests.
284
285 In all cases, each pipeline may specify a precedence value which
286 is used by Nodepool to satisfy requests from higher-precedence
287 pipelines first. If ``relative_priority`` is set to ``True``,
288 then Zuul will additionally group items in the same pipeline by
289 project and weight each request by its position in that
290 project's group. A request for the first change of a given
291 project will have the highest relative priority, and the second
292 change a lower relative priority. The first change of each
293 project in a pipeline has the same relative priority, regardless
294 of the order of submission or how many other changes are in the
295 pipeline. This can be used to make node allocations complete
296 faster for projects with fewer changes in a system dominated by
297 projects with more changes.
298
299 If this value is ``False`` (the default), then node requests are
300 sorted by pipeline precedence followed by the order in which
301 they were submitted. If this is ``True``, they are sorted by
302 pipeline precedence, followed by relative priority, and finally
303 the order in which they were submitted.
304
279Operation 305Operation
280~~~~~~~~~ 306~~~~~~~~~
281 307
diff --git a/releasenotes/notes/relative_priority-dee014da5977da36.yaml b/releasenotes/notes/relative_priority-dee014da5977da36.yaml
new file mode 100644
index 0000000..e072de0
--- /dev/null
+++ b/releasenotes/notes/relative_priority-dee014da5977da36.yaml
@@ -0,0 +1,6 @@
1---
2features:
3 - |
4 A new scheduler option, :attr:`scheduler.relative_priority`, can
5 be used to instruct Nodepool to fulfull requests from less-busy
6 projects more quickly.
diff --git a/tests/base.py b/tests/base.py
index 7cc2239..7e6fa46 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1737,6 +1737,7 @@ class FakeNodepool(object):
1737 log = logging.getLogger("zuul.test.FakeNodepool") 1737 log = logging.getLogger("zuul.test.FakeNodepool")
1738 1738
1739 def __init__(self, host, port, chroot): 1739 def __init__(self, host, port, chroot):
1740 self.complete_event = threading.Event()
1740 self.host_keys = None 1741 self.host_keys = None
1741 self.client = kazoo.client.KazooClient( 1742 self.client = kazoo.client.KazooClient(
1742 hosts='%s:%s%s' % (host, port, chroot)) 1743 hosts='%s:%s%s' % (host, port, chroot))
@@ -1756,12 +1757,21 @@ class FakeNodepool(object):
1756 self.client.stop() 1757 self.client.stop()
1757 self.client.close() 1758 self.client.close()
1758 1759
1760 def pause(self):
1761 self.complete_event.wait()
1762 self.paused = True
1763
1764 def unpause(self):
1765 self.paused = False
1766
1759 def run(self): 1767 def run(self):
1760 while self._running: 1768 while self._running:
1769 self.complete_event.clear()
1761 try: 1770 try:
1762 self._run() 1771 self._run()
1763 except Exception: 1772 except Exception:
1764 self.log.exception("Error in fake nodepool:") 1773 self.log.exception("Error in fake nodepool:")
1774 self.complete_event.set()
1765 time.sleep(0.1) 1775 time.sleep(0.1)
1766 1776
1767 def _run(self): 1777 def _run(self):
@@ -1776,7 +1786,7 @@ class FakeNodepool(object):
1776 except kazoo.exceptions.NoNodeError: 1786 except kazoo.exceptions.NoNodeError:
1777 return [] 1787 return []
1778 reqs = [] 1788 reqs = []
1779 for oid in sorted(reqids): 1789 for oid in reqids:
1780 path = self.REQUEST_ROOT + '/' + oid 1790 path = self.REQUEST_ROOT + '/' + oid
1781 try: 1791 try:
1782 data, stat = self.client.get(path) 1792 data, stat = self.client.get(path)
@@ -1785,6 +1795,9 @@ class FakeNodepool(object):
1785 reqs.append(data) 1795 reqs.append(data)
1786 except kazoo.exceptions.NoNodeError: 1796 except kazoo.exceptions.NoNodeError:
1787 pass 1797 pass
1798 reqs.sort(key=lambda r: (r['_oid'].split('-')[0],
1799 r['relative_priority'],
1800 r['_oid'].split('-')[1]))
1788 return reqs 1801 return reqs
1789 1802
1790 def getNodes(self): 1803 def getNodes(self):
diff --git a/tests/fixtures/layouts/two-projects-integrated.yaml b/tests/fixtures/layouts/two-projects-integrated.yaml
new file mode 100644
index 0000000..e927aca
--- /dev/null
+++ b/tests/fixtures/layouts/two-projects-integrated.yaml
@@ -0,0 +1,79 @@
1- pipeline:
2 name: check
3 manager: independent
4 trigger:
5 gerrit:
6 - event: patchset-created
7 success:
8 gerrit:
9 Verified: 1
10 failure:
11 gerrit:
12 Verified: -1
13
14- pipeline:
15 name: gate
16 manager: dependent
17 success-message: Build succeeded (gate).
18 trigger:
19 gerrit:
20 - event: comment-added
21 approval:
22 - Approved: 1
23 success:
24 gerrit:
25 Verified: 2
26 submit: true
27 failure:
28 gerrit:
29 Verified: -2
30 start:
31 gerrit:
32 Verified: 0
33 precedence: high
34
35- job:
36 name: base
37 parent: null
38 run: playbooks/base.yaml
39 nodeset:
40 nodes:
41 - name: controller
42 label: ubuntu-xenial
43
44- job:
45 name: test
46 run: playbooks/test.yaml
47
48- job:
49 name: integration
50 run: playbooks/integration.yaml
51
52- project:
53 name: org/project
54 check:
55 jobs:
56 - test
57 gate:
58 jobs:
59 - test
60
61- project:
62 name: org/project1
63 check:
64 jobs:
65 - integration
66 gate:
67 queue: integrated
68 jobs:
69 - integration
70
71- project:
72 name: org/project2
73 check:
74 jobs:
75 - integration
76 gate:
77 queue: integrated
78 jobs:
79 - integration
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index e6f997c..5d79378 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -8,6 +8,7 @@ server=127.0.0.1
8 8
9[scheduler] 9[scheduler]
10tenant_config=main.yaml 10tenant_config=main.yaml
11relative_priority=true
11 12
12[merger] 13[merger]
13git_dir=/tmp/zuul-test/merger-git 14git_dir=/tmp/zuul-test/merger-git
diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py
index 2f36154..b608bba 100644
--- a/tests/nodepool/test_nodepool_integration.py
+++ b/tests/nodepool/test_nodepool_integration.py
@@ -58,7 +58,7 @@ class TestNodepoolIntegration(BaseTestCase):
58 nodeset.addNode(model.Node(['controller'], 'fake-label')) 58 nodeset.addNode(model.Node(['controller'], 'fake-label'))
59 job = model.Job('testjob') 59 job = model.Job('testjob')
60 job.nodeset = nodeset 60 job.nodeset = nodeset
61 request = self.nodepool.requestNodes(None, job) 61 request = self.nodepool.requestNodes(None, job, 0)
62 self.waitForRequests() 62 self.waitForRequests()
63 self.assertEqual(len(self.provisioned_requests), 1) 63 self.assertEqual(len(self.provisioned_requests), 1)
64 self.assertEqual(request.state, model.STATE_FULFILLED) 64 self.assertEqual(request.state, model.STATE_FULFILLED)
@@ -88,7 +88,7 @@ class TestNodepoolIntegration(BaseTestCase):
88 nodeset.addNode(model.Node(['controller'], 'invalid-label')) 88 nodeset.addNode(model.Node(['controller'], 'invalid-label'))
89 job = model.Job('testjob') 89 job = model.Job('testjob')
90 job.nodeset = nodeset 90 job.nodeset = nodeset
91 request = self.nodepool.requestNodes(None, job) 91 request = self.nodepool.requestNodes(None, job, 0)
92 self.waitForRequests() 92 self.waitForRequests()
93 self.assertEqual(len(self.provisioned_requests), 1) 93 self.assertEqual(len(self.provisioned_requests), 1)
94 self.assertEqual(request.state, model.STATE_FAILED) 94 self.assertEqual(request.state, model.STATE_FAILED)
@@ -103,7 +103,7 @@ class TestNodepoolIntegration(BaseTestCase):
103 job = model.Job('testjob') 103 job = model.Job('testjob')
104 job.nodeset = nodeset 104 job.nodeset = nodeset
105 self.fake_nodepool.paused = True 105 self.fake_nodepool.paused = True
106 request = self.nodepool.requestNodes(None, job) 106 request = self.nodepool.requestNodes(None, job, 0)
107 self.zk.client.stop() 107 self.zk.client.stop()
108 self.zk.client.start() 108 self.zk.client.start()
109 self.fake_nodepool.paused = False 109 self.fake_nodepool.paused = False
@@ -121,7 +121,7 @@ class TestNodepoolIntegration(BaseTestCase):
121 job = model.Job('testjob') 121 job = model.Job('testjob')
122 job.nodeset = nodeset 122 job.nodeset = nodeset
123 self.fake_nodepool.paused = True 123 self.fake_nodepool.paused = True
124 request = self.nodepool.requestNodes(None, job) 124 request = self.nodepool.requestNodes(None, job, 0)
125 self.nodepool.cancelRequest(request) 125 self.nodepool.cancelRequest(request)
126 126
127 self.waitForRequests() 127 self.waitForRequests()
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index aa0f082..e822a10 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -71,7 +71,7 @@ class TestNodepool(BaseTestCase):
71 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 71 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
72 job = model.Job('testjob') 72 job = model.Job('testjob')
73 job.nodeset = nodeset 73 job.nodeset = nodeset
74 request = self.nodepool.requestNodes(None, job) 74 request = self.nodepool.requestNodes(None, job, 0)
75 self.waitForRequests() 75 self.waitForRequests()
76 self.assertEqual(len(self.provisioned_requests), 1) 76 self.assertEqual(len(self.provisioned_requests), 1)
77 self.assertEqual(request.state, 'fulfilled') 77 self.assertEqual(request.state, 'fulfilled')
@@ -103,11 +103,11 @@ class TestNodepool(BaseTestCase):
103 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 103 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
104 job = model.Job('testjob') 104 job = model.Job('testjob')
105 job.nodeset = nodeset 105 job.nodeset = nodeset
106 self.fake_nodepool.paused = True 106 self.fake_nodepool.pause()
107 request = self.nodepool.requestNodes(None, job) 107 request = self.nodepool.requestNodes(None, job, 0)
108 self.zk.client.stop() 108 self.zk.client.stop()
109 self.zk.client.start() 109 self.zk.client.start()
110 self.fake_nodepool.paused = False 110 self.fake_nodepool.unpause()
111 self.waitForRequests() 111 self.waitForRequests()
112 self.assertEqual(len(self.provisioned_requests), 1) 112 self.assertEqual(len(self.provisioned_requests), 1)
113 self.assertEqual(request.state, 'fulfilled') 113 self.assertEqual(request.state, 'fulfilled')
@@ -120,8 +120,8 @@ class TestNodepool(BaseTestCase):
120 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 120 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
121 job = model.Job('testjob') 121 job = model.Job('testjob')
122 job.nodeset = nodeset 122 job.nodeset = nodeset
123 self.fake_nodepool.paused = True 123 self.fake_nodepool.pause()
124 request = self.nodepool.requestNodes(None, job) 124 request = self.nodepool.requestNodes(None, job, 0)
125 self.nodepool.cancelRequest(request) 125 self.nodepool.cancelRequest(request)
126 126
127 self.waitForRequests() 127 self.waitForRequests()
@@ -135,7 +135,7 @@ class TestNodepool(BaseTestCase):
135 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 135 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
136 job = model.Job('testjob') 136 job = model.Job('testjob')
137 job.nodeset = nodeset 137 job.nodeset = nodeset
138 request = self.nodepool.requestNodes(None, job) 138 request = self.nodepool.requestNodes(None, job, 0)
139 self.waitForRequests() 139 self.waitForRequests()
140 self.assertEqual(len(self.provisioned_requests), 1) 140 self.assertEqual(len(self.provisioned_requests), 1)
141 self.assertEqual(request.state, 'fulfilled') 141 self.assertEqual(request.state, 'fulfilled')
@@ -156,7 +156,7 @@ class TestNodepool(BaseTestCase):
156 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 156 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
157 job = model.Job('testjob') 157 job = model.Job('testjob')
158 job.nodeset = nodeset 158 job.nodeset = nodeset
159 request = self.nodepool.requestNodes(None, job) 159 request = self.nodepool.requestNodes(None, job, 0)
160 self.waitForRequests() 160 self.waitForRequests()
161 self.assertEqual(len(self.provisioned_requests), 1) 161 self.assertEqual(len(self.provisioned_requests), 1)
162 self.assertEqual(request.state, 'fulfilled') 162 self.assertEqual(request.state, 'fulfilled')
@@ -170,3 +170,21 @@ class TestNodepool(BaseTestCase):
170 for node in nodeset.getNodes(): 170 for node in nodeset.getNodes():
171 self.assertIsNone(node.lock) 171 self.assertIsNone(node.lock)
172 self.assertEqual(node.state, 'ready') 172 self.assertEqual(node.state, 'ready')
173
174 def test_node_request_priority(self):
175 # Test that requests are satisfied in priority order
176
177 nodeset = model.NodeSet()
178 nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
179 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
180 job = model.Job('testjob')
181 job.nodeset = nodeset
182 self.fake_nodepool.pause()
183 request1 = self.nodepool.requestNodes(None, job, 1)
184 request2 = self.nodepool.requestNodes(None, job, 0)
185 self.fake_nodepool.unpause()
186 self.waitForRequests()
187 self.assertEqual(len(self.provisioned_requests), 2)
188 self.assertEqual(request1.state, 'fulfilled')
189 self.assertEqual(request2.state, 'fulfilled')
190 self.assertTrue(request2.state_time < request1.state_time)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index ba7be3d..81ca3b8 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -5025,7 +5025,7 @@ For CI problems and help debugging, contact ci@example.org"""
5025 def test_zookeeper_disconnect(self): 5025 def test_zookeeper_disconnect(self):
5026 "Test that jobs are executed after a zookeeper disconnect" 5026 "Test that jobs are executed after a zookeeper disconnect"
5027 5027
5028 self.fake_nodepool.paused = True 5028 self.fake_nodepool.pause()
5029 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5029 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5030 A.addApproval('Code-Review', 2) 5030 A.addApproval('Code-Review', 2)
5031 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 5031 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5033,7 +5033,7 @@ For CI problems and help debugging, contact ci@example.org"""
5033 5033
5034 self.zk.client.stop() 5034 self.zk.client.stop()
5035 self.zk.client.start() 5035 self.zk.client.start()
5036 self.fake_nodepool.paused = False 5036 self.fake_nodepool.unpause()
5037 self.waitUntilSettled() 5037 self.waitUntilSettled()
5038 5038
5039 self.assertEqual(A.data['status'], 'MERGED') 5039 self.assertEqual(A.data['status'], 'MERGED')
@@ -5044,7 +5044,7 @@ For CI problems and help debugging, contact ci@example.org"""
5044 5044
5045 # This tests receiving a ZK disconnect between the arrival of 5045 # This tests receiving a ZK disconnect between the arrival of
5046 # a fulfilled request and when we accept its nodes. 5046 # a fulfilled request and when we accept its nodes.
5047 self.fake_nodepool.paused = True 5047 self.fake_nodepool.pause()
5048 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5048 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5049 A.addApproval('Code-Review', 2) 5049 A.addApproval('Code-Review', 2)
5050 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 5050 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5056,7 +5056,7 @@ For CI problems and help debugging, contact ci@example.org"""
5056 self.sched.run_handler_lock.acquire() 5056 self.sched.run_handler_lock.acquire()
5057 5057
5058 # Fulfill the nodepool request. 5058 # Fulfill the nodepool request.
5059 self.fake_nodepool.paused = False 5059 self.fake_nodepool.unpause()
5060 requests = list(self.sched.nodepool.requests.values()) 5060 requests = list(self.sched.nodepool.requests.values())
5061 self.assertEqual(1, len(requests)) 5061 self.assertEqual(1, len(requests))
5062 request = requests[0] 5062 request = requests[0]
@@ -5090,7 +5090,7 @@ For CI problems and help debugging, contact ci@example.org"""
5090 def test_nodepool_failure(self): 5090 def test_nodepool_failure(self):
5091 "Test that jobs are reported after a nodepool failure" 5091 "Test that jobs are reported after a nodepool failure"
5092 5092
5093 self.fake_nodepool.paused = True 5093 self.fake_nodepool.pause()
5094 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5094 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5095 A.addApproval('Code-Review', 2) 5095 A.addApproval('Code-Review', 2)
5096 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 5096 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5099,7 +5099,7 @@ For CI problems and help debugging, contact ci@example.org"""
5099 req = self.fake_nodepool.getNodeRequests()[0] 5099 req = self.fake_nodepool.getNodeRequests()[0]
5100 self.fake_nodepool.addFailRequest(req) 5100 self.fake_nodepool.addFailRequest(req)
5101 5101
5102 self.fake_nodepool.paused = False 5102 self.fake_nodepool.unpause()
5103 self.waitUntilSettled() 5103 self.waitUntilSettled()
5104 5104
5105 self.assertEqual(A.data['status'], 'NEW') 5105 self.assertEqual(A.data['status'], 'NEW')
@@ -5108,10 +5108,10 @@ For CI problems and help debugging, contact ci@example.org"""
5108 self.assertIn('project-test1 : SKIPPED', A.messages[1]) 5108 self.assertIn('project-test1 : SKIPPED', A.messages[1])
5109 self.assertIn('project-test2 : SKIPPED', A.messages[1]) 5109 self.assertIn('project-test2 : SKIPPED', A.messages[1])
5110 5110
5111 def test_nodepool_priority(self): 5111 def test_nodepool_pipeline_priority(self):
5112 "Test that nodes are requested at the correct priority" 5112 "Test that nodes are requested at the correct pipeline priority"
5113 5113
5114 self.fake_nodepool.paused = True 5114 self.fake_nodepool.pause()
5115 5115
5116 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5116 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5117 self.fake_gerrit.addEvent(A.getRefUpdatedEvent()) 5117 self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
@@ -5128,10 +5128,11 @@ For CI problems and help debugging, contact ci@example.org"""
5128 5128
5129 reqs = self.fake_nodepool.getNodeRequests() 5129 reqs = self.fake_nodepool.getNodeRequests()
5130 5130
5131 # The requests come back sorted by oid. Since we have three requests 5131 # The requests come back sorted by priority. Since we have
5132 # for the three changes each with a different priority. 5132 # three requests for the three changes each with a different
5133 # Also they get a serial number based on order they were received 5133 # priority. Also they get a serial number based on order they
5134 # so the number on the endof the oid should map to order submitted. 5134 # were received so the number on the endof the oid should map
5135 # to order submitted.
5135 5136
5136 # * gate first - high priority - change C 5137 # * gate first - high priority - change C
5137 self.assertEqual(reqs[0]['_oid'], '100-0000000002') 5138 self.assertEqual(reqs[0]['_oid'], '100-0000000002')
@@ -5145,13 +5146,93 @@ For CI problems and help debugging, contact ci@example.org"""
5145 self.assertEqual(reqs[2]['_oid'], '300-0000000000') 5146 self.assertEqual(reqs[2]['_oid'], '300-0000000000')
5146 self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial']) 5147 self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
5147 5148
5148 self.fake_nodepool.paused = False 5149 self.fake_nodepool.unpause()
5150 self.waitUntilSettled()
5151
5152 def test_nodepool_relative_priority_check(self):
5153 "Test that nodes are requested at the relative priority"
5154
5155 self.fake_nodepool.pause()
5156
5157 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5158 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
5159 self.waitUntilSettled()
5160
5161 B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
5162 self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
5163 self.waitUntilSettled()
5164
5165 C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C')
5166 self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
5167 self.waitUntilSettled()
5168
5169 reqs = self.fake_nodepool.getNodeRequests()
5170
5171 # The requests come back sorted by priority.
5172
5173 # Change A, first change for project, high relative priority.
5174 self.assertEqual(reqs[0]['_oid'], '200-0000000000')
5175 self.assertEqual(reqs[0]['relative_priority'], 0)
5176
5177 # Change C, first change for project1, high relative priority.
5178 self.assertEqual(reqs[1]['_oid'], '200-0000000002')
5179 self.assertEqual(reqs[1]['relative_priority'], 0)
5180
5181 # Change B, second change for project, lower relative priority.
5182 self.assertEqual(reqs[2]['_oid'], '200-0000000001')
5183 self.assertEqual(reqs[2]['relative_priority'], 1)
5184
5185 self.fake_nodepool.unpause()
5186 self.waitUntilSettled()
5187
5188 @simple_layout('layouts/two-projects-integrated.yaml')
5189 def test_nodepool_relative_priority_gate(self):
5190 "Test that nodes are requested at the relative priority"
5191
5192 self.fake_nodepool.pause()
5193
5194 A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
5195 A.addApproval('Code-Review', 2)
5196 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
5197 self.waitUntilSettled()
5198
5199 B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
5200 B.addApproval('Code-Review', 2)
5201 self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
5202 self.waitUntilSettled()
5203
5204 # project does not share a queue with project1 and project2.
5205 C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
5206 C.addApproval('Code-Review', 2)
5207 self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
5208 self.waitUntilSettled()
5209
5210 reqs = self.fake_nodepool.getNodeRequests()
5211
5212 # The requests come back sorted by priority.
5213
5214 # Change A, first change for shared queue, high relative
5215 # priority.
5216 self.assertEqual(reqs[0]['_oid'], '100-0000000000')
5217 self.assertEqual(reqs[0]['relative_priority'], 0)
5218
5219 # Change C, first change for independent project, high
5220 # relative priority.
5221 self.assertEqual(reqs[1]['_oid'], '100-0000000002')
5222 self.assertEqual(reqs[1]['relative_priority'], 0)
5223
5224 # Change B, second change for shared queue, lower relative
5225 # priority.
5226 self.assertEqual(reqs[2]['_oid'], '100-0000000001')
5227 self.assertEqual(reqs[2]['relative_priority'], 1)
5228
5229 self.fake_nodepool.unpause()
5149 self.waitUntilSettled() 5230 self.waitUntilSettled()
5150 5231
5151 def test_nodepool_job_removal(self): 5232 def test_nodepool_job_removal(self):
5152 "Test that nodes are returned unused after job removal" 5233 "Test that nodes are returned unused after job removal"
5153 5234
5154 self.fake_nodepool.paused = True 5235 self.fake_nodepool.pause()
5155 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5236 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5156 A.addApproval('Code-Review', 2) 5237 A.addApproval('Code-Review', 2)
5157 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 5238 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5161,7 +5242,7 @@ For CI problems and help debugging, contact ci@example.org"""
5161 self.sched.reconfigure(self.config) 5242 self.sched.reconfigure(self.config)
5162 self.waitUntilSettled() 5243 self.waitUntilSettled()
5163 5244
5164 self.fake_nodepool.paused = False 5245 self.fake_nodepool.unpause()
5165 self.waitUntilSettled() 5246 self.waitUntilSettled()
5166 5247
5167 self.assertEqual(A.data['status'], 'MERGED') 5248 self.assertEqual(A.data['status'], 'MERGED')
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index da38818..353c470 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -85,6 +85,11 @@ class PipelineManager(object):
85 return True 85 return True
86 return False 86 return False
87 87
88 def getNodePriority(self, item):
89 items = self.pipeline.getAllItems()
90 items = [i for i in items if i.change.project == item.change.project]
91 return items.index(item)
92
88 def isChangeAlreadyInPipeline(self, change): 93 def isChangeAlreadyInPipeline(self, change):
89 # Checks live items in the pipeline 94 # Checks live items in the pipeline
90 for item in self.pipeline.getAllItems(): 95 for item in self.pipeline.getAllItems():
@@ -327,8 +332,12 @@ class PipelineManager(object):
327 return False 332 return False
328 build_set = item.current_build_set 333 build_set = item.current_build_set
329 self.log.debug("Requesting nodes for change %s" % item.change) 334 self.log.debug("Requesting nodes for change %s" % item.change)
335 if self.sched.use_relative_priority:
336 priority = item.getNodePriority()
337 else:
338 priority = 0
330 for job in jobs: 339 for job in jobs:
331 req = self.sched.nodepool.requestNodes(build_set, job) 340 req = self.sched.nodepool.requestNodes(build_set, job, priority)
332 self.log.debug("Adding node request %s for job %s to item %s" % 341 self.log.debug("Adding node request %s for job %s to item %s" %
333 (req, job, item)) 342 (req, job, item))
334 build_set.setJobNodeRequest(job.name, req) 343 build_set.setJobNodeRequest(job.name, req)
@@ -687,6 +696,12 @@ class PipelineManager(object):
687 if failing_reasons: 696 if failing_reasons:
688 self.log.debug("%s is a failing item because %s" % 697 self.log.debug("%s is a failing item because %s" %
689 (item, failing_reasons)) 698 (item, failing_reasons))
699 if not dequeued and self.sched.use_relative_priority:
700 priority = item.getNodePriority()
701 for node_request in item.current_build_set.node_requests.values():
702 if node_request.relative_priority != priority:
703 self.sched.nodepool.reviseNodeRequest(
704 node_request, priority)
690 return (changed, nnfi) 705 return (changed, nnfi)
691 706
692 def processQueue(self): 707 def processQueue(self):
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index f7a73de..3fcbafe 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -93,6 +93,11 @@ class DependentPipelineManager(PipelineManager):
93 self.log.debug("Dynamically created queue %s", change_queue) 93 self.log.debug("Dynamically created queue %s", change_queue)
94 return DynamicChangeQueueContextManager(change_queue) 94 return DynamicChangeQueueContextManager(change_queue)
95 95
96 def getNodePriority(self, item):
97 with self.getChangeQueue(item.change) as change_queue:
98 items = change_queue.queue
99 return items.index(item)
100
96 def isChangeReadyToBeEnqueued(self, change): 101 def isChangeReadyToBeEnqueued(self, change):
97 source = change.project.source 102 source = change.project.source
98 if not source.canMerge(change, self.getSubmitAllowNeeds()): 103 if not source.canMerge(change, self.getSubmitAllowNeeds()):
diff --git a/zuul/model.py b/zuul/model.py
index 85b5a54..1c0a541 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -688,7 +688,7 @@ class NodeSet(ConfigObject):
688class NodeRequest(object): 688class NodeRequest(object):
689 """A request for a set of nodes.""" 689 """A request for a set of nodes."""
690 690
691 def __init__(self, requestor, build_set, job, nodeset): 691 def __init__(self, requestor, build_set, job, nodeset, relative_priority):
692 self.requestor = requestor 692 self.requestor = requestor
693 self.build_set = build_set 693 self.build_set = build_set
694 self.job = job 694 self.job = job
@@ -696,9 +696,12 @@ class NodeRequest(object):
696 self._state = STATE_REQUESTED 696 self._state = STATE_REQUESTED
697 self.requested_time = time.time() 697 self.requested_time = time.time()
698 self.state_time = time.time() 698 self.state_time = time.time()
699 self.created_time = None
699 self.stat = None 700 self.stat = None
700 self.uid = uuid4().hex 701 self.uid = uuid4().hex
702 self.relative_priority = relative_priority
701 self.id = None 703 self.id = None
704 self._zk_data = {} # Data that we read back from ZK
702 # Zuul internal flags (not stored in ZK so they are not 705 # Zuul internal flags (not stored in ZK so they are not
703 # overwritten). 706 # overwritten).
704 self.failed = False 707 self.failed = False
@@ -731,17 +734,24 @@ class NodeRequest(object):
731 return '<NodeRequest %s %s>' % (self.id, self.nodeset) 734 return '<NodeRequest %s %s>' % (self.id, self.nodeset)
732 735
733 def toDict(self): 736 def toDict(self):
734 d = {} 737 # Start with any previously read data
738 d = self._zk_data.copy()
735 nodes = [n.label for n in self.nodeset.getNodes()] 739 nodes = [n.label for n in self.nodeset.getNodes()]
736 d['node_types'] = nodes 740 # These are immutable once set
737 d['requestor'] = self.requestor 741 d.setdefault('node_types', nodes)
742 d.setdefault('requestor', self.requestor)
743 d.setdefault('created_time', self.created_time)
744 # We might change these
738 d['state'] = self.state 745 d['state'] = self.state
739 d['state_time'] = self.state_time 746 d['state_time'] = self.state_time
747 d['relative_priority'] = self.relative_priority
740 return d 748 return d
741 749
742 def updateFromDict(self, data): 750 def updateFromDict(self, data):
751 self._zk_data = data
743 self._state = data['state'] 752 self._state = data['state']
744 self.state_time = data['state_time'] 753 self.state_time = data['state_time']
754 self.relative_priority = data['relative_priority']
745 755
746 756
747class Secret(ConfigObject): 757class Secret(ConfigObject):
@@ -2268,6 +2278,9 @@ class QueueItem(object):
2268 fakebuild.result = 'SKIPPED' 2278 fakebuild.result = 'SKIPPED'
2269 self.addBuild(fakebuild) 2279 self.addBuild(fakebuild)
2270 2280
2281 def getNodePriority(self):
2282 return self.pipeline.manager.getNodePriority(self)
2283
2271 def formatUrlPattern(self, url_pattern, job=None, build=None): 2284 def formatUrlPattern(self, url_pattern, job=None, build=None):
2272 url = None 2285 url = None
2273 # Produce safe versions of objects which may be useful in 2286 # Produce safe versions of objects which may be useful in
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 68f9629..56ee6c1 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -13,6 +13,7 @@
13import logging 13import logging
14 14
15from zuul import model 15from zuul import model
16from zuul.zk import LockException
16 17
17 18
18class Nodepool(object): 19class Nodepool(object):
@@ -51,11 +52,12 @@ class Nodepool(object):
51 statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt) 52 statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
52 statsd.gauge('zuul.nodepool.current_requests', len(self.requests)) 53 statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
53 54
54 def requestNodes(self, build_set, job): 55 def requestNodes(self, build_set, job, relative_priority):
55 # Create a copy of the nodeset to represent the actual nodes 56 # Create a copy of the nodeset to represent the actual nodes
56 # returned by nodepool. 57 # returned by nodepool.
57 nodeset = job.nodeset.copy() 58 nodeset = job.nodeset.copy()
58 req = model.NodeRequest(self.sched.hostname, build_set, job, nodeset) 59 req = model.NodeRequest(self.sched.hostname, build_set, job,
60 nodeset, relative_priority)
59 self.requests[req.uid] = req 61 self.requests[req.uid] = req
60 62
61 if nodeset.nodes: 63 if nodeset.nodes:
@@ -79,6 +81,38 @@ class Nodepool(object):
79 except Exception: 81 except Exception:
80 self.log.exception("Error deleting node request:") 82 self.log.exception("Error deleting node request:")
81 83
84 def reviseRequest(self, request, relative_priority=None):
85 '''Attempt to update the node request, if it is not currently being
86 processed.
87
88 :param: NodeRequest request: The request to update.
89 :param relative_priority int: If supplied, the new relative
90 priority to set on the request.
91
92 '''
93 if relative_priority is None:
94 return
95 try:
96 self.sched.zk.lockNodeRequest(request, blocking=False)
97 except LockException:
98 # It may be locked by nodepool, which is fine.
99 self.log.debug("Unable to revise locked node request %s", request)
100 return False
101 try:
102 old_priority = request.relative_priority
103 request.relative_priority = relative_priority
104 self.sched.zk.storeNodeRequest(request)
105 self.log.debug("Revised relative priority of "
106 "node request %s from %s to %s",
107 request, old_priority, relative_priority)
108 except Exception:
109 self.log.exception("Unable to update node request %s", request)
110 finally:
111 try:
112 self.sched.zk.unlockNodeRequest(request)
113 except Exception:
114 self.log.exception("Unable to unlock node request %s", request)
115
82 def holdNodeSet(self, nodeset, autohold_key): 116 def holdNodeSet(self, nodeset, autohold_key):
83 ''' 117 '''
84 Perform a hold on the given set of nodes. 118 Perform a hold on the given set of nodes.
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index 3f501fd..9cbc8a5 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -306,6 +306,10 @@ class Scheduler(threading.Thread):
306 self.last_reconfigured = None 306 self.last_reconfigured = None
307 self.tenant_last_reconfigured = {} 307 self.tenant_last_reconfigured = {}
308 self.autohold_requests = {} 308 self.autohold_requests = {}
309 self.use_relative_priority = False
310 if self.config.has_option('scheduler', 'relative_priority'):
311 if self.config.getboolean('scheduler', 'relative_priority'):
312 self.use_relative_priority = True
309 313
310 def start(self): 314 def start(self):
311 super(Scheduler, self).start() 315 super(Scheduler, self).start()
diff --git a/zuul/zk.py b/zuul/zk.py
index 852df87..f1ff4c8 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -41,6 +41,7 @@ class ZooKeeper(object):
41 log = logging.getLogger("zuul.zk.ZooKeeper") 41 log = logging.getLogger("zuul.zk.ZooKeeper")
42 42
43 REQUEST_ROOT = '/nodepool/requests' 43 REQUEST_ROOT = '/nodepool/requests'
44 REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
44 NODE_ROOT = '/nodepool/nodes' 45 NODE_ROOT = '/nodepool/nodes'
45 46
46 # Log zookeeper retry every 10 seconds 47 # Log zookeeper retry every 10 seconds
@@ -162,8 +163,8 @@ class ZooKeeper(object):
162 from ZooKeeper). The watcher should return False when 163 from ZooKeeper). The watcher should return False when
163 further updates are no longer necessary. 164 further updates are no longer necessary.
164 ''' 165 '''
166 node_request.created_time = time.time()
165 data = node_request.toDict() 167 data = node_request.toDict()
166 data['created_time'] = time.time()
167 168
168 path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority) 169 path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
169 path = self.client.create(path, self._dictToStr(data), 170 path = self.client.create(path, self._dictToStr(data),
@@ -174,15 +175,7 @@ class ZooKeeper(object):
174 175
175 def callback(data, stat): 176 def callback(data, stat):
176 if data: 177 if data:
177 data = self._strToDict(data) 178 self.updateNodeRequest(node_request, data)
178 request_nodes = list(node_request.nodeset.getNodes())
179 for i, nodeid in enumerate(data.get('nodes', [])):
180 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
181 node_data, node_stat = self.client.get(node_path)
182 node_data = self._strToDict(node_data)
183 request_nodes[i].id = nodeid
184 request_nodes[i].updateFromDict(node_data)
185 node_request.updateFromDict(data)
186 deleted = (data is None) # data *are* none 179 deleted = (data is None) # data *are* none
187 return watcher(node_request, deleted) 180 return watcher(node_request, deleted)
188 181
@@ -215,6 +208,34 @@ class ZooKeeper(object):
215 return True 208 return True
216 return False 209 return False
217 210
211 def storeNodeRequest(self, node_request):
212 '''Store the node request.
213
214 The request is expected to already exist and is updated in its
215 entirety.
216
217 :param NodeRequest node_request: The request to update.
218 '''
219
220 path = '%s/%s' % (self.NODE_REQUEST_ROOT, node_request.id)
221 self.client.set(path, self._dictToStr(node_request.toDict()))
222
223 def updateNodeRequest(self, node_request, data=None):
224 '''Refresh an existing node request.
225
226 :param NodeRequest node_request: The request to update.
227 :param dict data: The data to use; query ZK if absent.
228 '''
229 if data is None:
230 path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
231 data, stat = self.client.get(path)
232 data = self._strToDict(data)
233 request_nodes = list(node_request.nodeset.getNodes())
234 for i, nodeid in enumerate(data.get('nodes', [])):
235 request_nodes[i].id = nodeid
236 self.updateNode(request_nodes[i], nodeid)
237 node_request.updateFromDict(data)
238
218 def storeNode(self, node): 239 def storeNode(self, node):
219 '''Store the node. 240 '''Store the node.
220 241
@@ -227,6 +248,18 @@ class ZooKeeper(object):
227 path = '%s/%s' % (self.NODE_ROOT, node.id) 248 path = '%s/%s' % (self.NODE_ROOT, node.id)
228 self.client.set(path, self._dictToStr(node.toDict())) 249 self.client.set(path, self._dictToStr(node.toDict()))
229 250
251 def updateNode(self, node, nodeid):
252 '''Refresh an existing node.
253
254 :param Node node: The node to update.
255 :param Node nodeid: The zookeeper node ID.
256 '''
257
258 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
259 node_data, node_stat = self.client.get(node_path)
260 node_data = self._strToDict(node_data)
261 node.updateFromDict(node_data)
262
230 def lockNode(self, node, blocking=True, timeout=None): 263 def lockNode(self, node, blocking=True, timeout=None):
231 ''' 264 '''
232 Lock a node. 265 Lock a node.
@@ -268,6 +301,59 @@ class ZooKeeper(object):
268 node.lock.release() 301 node.lock.release()
269 node.lock = None 302 node.lock = None
270 303
304 def lockNodeRequest(self, request, blocking=True, timeout=None):
305 '''
306 Lock a node request.
307
308 This will set the `lock` attribute of the request object when the
309 lock is successfully acquired.
310
311 :param NodeRequest request: The request to lock.
312 :param bool blocking: Whether or not to block on trying to
313 acquire the lock
314 :param int timeout: When blocking, how long to wait for the lock
315 to get acquired. None, the default, waits forever.
316
317 :raises: TimeoutException if we failed to acquire the lock when
318 blocking with a timeout. ZKLockException if we are not blocking
319 and could not get the lock, or a lock is already held.
320 '''
321
322 path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id)
323 try:
324 lock = Lock(self.client, path)
325 have_lock = lock.acquire(blocking, timeout)
326 except kze.LockTimeout:
327 raise LockException(
328 "Timeout trying to acquire lock %s" % path)
329 except kze.NoNodeError:
330 have_lock = False
331 self.log.error("Request not found for locking: %s", request)
332
333 # If we aren't blocking, it's possible we didn't get the lock
334 # because someone else has it.
335 if not have_lock:
336 raise LockException("Did not get lock on %s" % path)
337
338 request.lock = lock
339 self.updateNodeRequest(request)
340
341 def unlockNodeRequest(self, request):
342 '''
343 Unlock a node request.
344
345 The request must already have been locked.
346
347 :param NodeRequest request: The request to unlock.
348
349 :raises: ZKLockException if the request is not currently locked.
350 '''
351 if request.lock is None:
352 raise LockException(
353 "Request %s does not hold a lock" % request)
354 request.lock.release()
355 request.lock = None
356
271 def heldNodeCount(self, autohold_key): 357 def heldNodeCount(self, autohold_key):
272 ''' 358 '''
273 Count the number of nodes being held for the given tenant/project/job. 359 Count the number of nodes being held for the given tenant/project/job.