summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames E. Blair <jeblair@redhat.com>2018-11-02 16:21:34 -0700
committerTobias Henkel <tobias.henkel@bmw.de>2018-11-30 12:50:34 +0100
commit0b00c4685b1883dcb6d9ac1814acc68c89eeae26 (patch)
treecb1acdc9cdd6d431bcbd6d2308380a15ce82b196
parent9c2b0a9bbede80138c47a21450af13fd32b5c475 (diff)
Set relative priority of node requests
Add a relative_priority field to node requests and continuously adjust it for each queue item based on the contents of queues. This allows for a more fair distribution of build resources between different projects. The first item in a pipeline from a given project (or, in the case of a dependent pipeline, group of projects) has equal priority to all other first-items of other projcets in the same pipeline. Second items have a lower priority, etc. Depends-On: https://review.openstack.org/620954 Change-Id: Id3799aeb2cec6d96a662bfa394a538050f7ea947
Notes
Notes (review): Code-Review+2: Tobias Henkel <tobias.henkel@bmw.de> Workflow+1: Tobias Henkel <tobias.henkel@bmw.de> Verified+2: Zuul Submitted-by: Zuul Submitted-at: Fri, 30 Nov 2018 15:24:56 +0000 Reviewed-on: https://review.openstack.org/615356 Project: openstack-infra/zuul Branch: refs/heads/master
-rw-r--r--doc/source/admin/components.rst26
-rw-r--r--releasenotes/notes/relative_priority-dee014da5977da36.yaml6
-rw-r--r--tests/base.py15
-rw-r--r--tests/fixtures/layouts/two-projects-integrated.yaml79
-rw-r--r--tests/fixtures/zuul.conf1
-rw-r--r--tests/nodepool/test_nodepool_integration.py8
-rw-r--r--tests/unit/test_nodepool.py34
-rw-r--r--tests/unit/test_scheduler.py113
-rw-r--r--zuul/manager/__init__.py17
-rw-r--r--zuul/manager/dependent.py5
-rw-r--r--zuul/model.py21
-rw-r--r--zuul/nodepool.py38
-rw-r--r--zuul/scheduler.py4
-rw-r--r--zuul/zk.py106
14 files changed, 427 insertions, 46 deletions
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index bed1f84..5c89bb8 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -276,6 +276,32 @@ The following sections of ``zuul.conf`` are used by the scheduler:
276 276
277 Path to directory in which Zuul should save its state. 277 Path to directory in which Zuul should save its state.
278 278
279 .. attr:: relative_priority
280 :default: False
281
282 A boolean which indicates whether the scheduler should supply
283 relative priority information for node requests.
284
285 In all cases, each pipeline may specify a precedence value which
286 is used by Nodepool to satisfy requests from higher-precedence
287 pipelines first. If ``relative_priority`` is set to ``True``,
288 then Zuul will additionally group items in the same pipeline by
289 project and weight each request by its position in that
290 project's group. A request for the first change of a given
291 project will have the highest relative priority, and the second
292 change a lower relative priority. The first change of each
293 project in a pipeline has the same relative priority, regardless
294 of the order of submission or how many other changes are in the
295 pipeline. This can be used to make node allocations complete
296 faster for projects with fewer changes in a system dominated by
297 projects with more changes.
298
299 If this value is ``False`` (the default), then node requests are
300 sorted by pipeline precedence followed by the order in which
301 they were submitted. If this is ``True``, they are sorted by
302 pipeline precedence, followed by relative priority, and finally
303 the order in which they were submitted.
304
279Operation 305Operation
280~~~~~~~~~ 306~~~~~~~~~
281 307
diff --git a/releasenotes/notes/relative_priority-dee014da5977da36.yaml b/releasenotes/notes/relative_priority-dee014da5977da36.yaml
new file mode 100644
index 0000000..e072de0
--- /dev/null
+++ b/releasenotes/notes/relative_priority-dee014da5977da36.yaml
@@ -0,0 +1,6 @@
1---
2features:
3 - |
4 A new scheduler option, :attr:`scheduler.relative_priority`, can
5 be used to instruct Nodepool to fulfull requests from less-busy
6 projects more quickly.
diff --git a/tests/base.py b/tests/base.py
index 7579951..1824c55 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1734,6 +1734,7 @@ class FakeNodepool(object):
1734 log = logging.getLogger("zuul.test.FakeNodepool") 1734 log = logging.getLogger("zuul.test.FakeNodepool")
1735 1735
1736 def __init__(self, host, port, chroot): 1736 def __init__(self, host, port, chroot):
1737 self.complete_event = threading.Event()
1737 self.host_keys = None 1738 self.host_keys = None
1738 self.client = kazoo.client.KazooClient( 1739 self.client = kazoo.client.KazooClient(
1739 hosts='%s:%s%s' % (host, port, chroot)) 1740 hosts='%s:%s%s' % (host, port, chroot))
@@ -1752,12 +1753,21 @@ class FakeNodepool(object):
1752 self.client.stop() 1753 self.client.stop()
1753 self.client.close() 1754 self.client.close()
1754 1755
1756 def pause(self):
1757 self.complete_event.wait()
1758 self.paused = True
1759
1760 def unpause(self):
1761 self.paused = False
1762
1755 def run(self): 1763 def run(self):
1756 while self._running: 1764 while self._running:
1765 self.complete_event.clear()
1757 try: 1766 try:
1758 self._run() 1767 self._run()
1759 except Exception: 1768 except Exception:
1760 self.log.exception("Error in fake nodepool:") 1769 self.log.exception("Error in fake nodepool:")
1770 self.complete_event.set()
1761 time.sleep(0.1) 1771 time.sleep(0.1)
1762 1772
1763 def _run(self): 1773 def _run(self):
@@ -1772,7 +1782,7 @@ class FakeNodepool(object):
1772 except kazoo.exceptions.NoNodeError: 1782 except kazoo.exceptions.NoNodeError:
1773 return [] 1783 return []
1774 reqs = [] 1784 reqs = []
1775 for oid in sorted(reqids): 1785 for oid in reqids:
1776 path = self.REQUEST_ROOT + '/' + oid 1786 path = self.REQUEST_ROOT + '/' + oid
1777 try: 1787 try:
1778 data, stat = self.client.get(path) 1788 data, stat = self.client.get(path)
@@ -1781,6 +1791,9 @@ class FakeNodepool(object):
1781 reqs.append(data) 1791 reqs.append(data)
1782 except kazoo.exceptions.NoNodeError: 1792 except kazoo.exceptions.NoNodeError:
1783 pass 1793 pass
1794 reqs.sort(key=lambda r: (r['_oid'].split('-')[0],
1795 r['relative_priority'],
1796 r['_oid'].split('-')[1]))
1784 return reqs 1797 return reqs
1785 1798
1786 def getNodes(self): 1799 def getNodes(self):
diff --git a/tests/fixtures/layouts/two-projects-integrated.yaml b/tests/fixtures/layouts/two-projects-integrated.yaml
new file mode 100644
index 0000000..e927aca
--- /dev/null
+++ b/tests/fixtures/layouts/two-projects-integrated.yaml
@@ -0,0 +1,79 @@
1- pipeline:
2 name: check
3 manager: independent
4 trigger:
5 gerrit:
6 - event: patchset-created
7 success:
8 gerrit:
9 Verified: 1
10 failure:
11 gerrit:
12 Verified: -1
13
14- pipeline:
15 name: gate
16 manager: dependent
17 success-message: Build succeeded (gate).
18 trigger:
19 gerrit:
20 - event: comment-added
21 approval:
22 - Approved: 1
23 success:
24 gerrit:
25 Verified: 2
26 submit: true
27 failure:
28 gerrit:
29 Verified: -2
30 start:
31 gerrit:
32 Verified: 0
33 precedence: high
34
35- job:
36 name: base
37 parent: null
38 run: playbooks/base.yaml
39 nodeset:
40 nodes:
41 - name: controller
42 label: ubuntu-xenial
43
44- job:
45 name: test
46 run: playbooks/test.yaml
47
48- job:
49 name: integration
50 run: playbooks/integration.yaml
51
52- project:
53 name: org/project
54 check:
55 jobs:
56 - test
57 gate:
58 jobs:
59 - test
60
61- project:
62 name: org/project1
63 check:
64 jobs:
65 - integration
66 gate:
67 queue: integrated
68 jobs:
69 - integration
70
71- project:
72 name: org/project2
73 check:
74 jobs:
75 - integration
76 gate:
77 queue: integrated
78 jobs:
79 - integration
diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf
index e6f997c..5d79378 100644
--- a/tests/fixtures/zuul.conf
+++ b/tests/fixtures/zuul.conf
@@ -8,6 +8,7 @@ server=127.0.0.1
8 8
9[scheduler] 9[scheduler]
10tenant_config=main.yaml 10tenant_config=main.yaml
11relative_priority=true
11 12
12[merger] 13[merger]
13git_dir=/tmp/zuul-test/merger-git 14git_dir=/tmp/zuul-test/merger-git
diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py
index 2f36154..b608bba 100644
--- a/tests/nodepool/test_nodepool_integration.py
+++ b/tests/nodepool/test_nodepool_integration.py
@@ -58,7 +58,7 @@ class TestNodepoolIntegration(BaseTestCase):
58 nodeset.addNode(model.Node(['controller'], 'fake-label')) 58 nodeset.addNode(model.Node(['controller'], 'fake-label'))
59 job = model.Job('testjob') 59 job = model.Job('testjob')
60 job.nodeset = nodeset 60 job.nodeset = nodeset
61 request = self.nodepool.requestNodes(None, job) 61 request = self.nodepool.requestNodes(None, job, 0)
62 self.waitForRequests() 62 self.waitForRequests()
63 self.assertEqual(len(self.provisioned_requests), 1) 63 self.assertEqual(len(self.provisioned_requests), 1)
64 self.assertEqual(request.state, model.STATE_FULFILLED) 64 self.assertEqual(request.state, model.STATE_FULFILLED)
@@ -88,7 +88,7 @@ class TestNodepoolIntegration(BaseTestCase):
88 nodeset.addNode(model.Node(['controller'], 'invalid-label')) 88 nodeset.addNode(model.Node(['controller'], 'invalid-label'))
89 job = model.Job('testjob') 89 job = model.Job('testjob')
90 job.nodeset = nodeset 90 job.nodeset = nodeset
91 request = self.nodepool.requestNodes(None, job) 91 request = self.nodepool.requestNodes(None, job, 0)
92 self.waitForRequests() 92 self.waitForRequests()
93 self.assertEqual(len(self.provisioned_requests), 1) 93 self.assertEqual(len(self.provisioned_requests), 1)
94 self.assertEqual(request.state, model.STATE_FAILED) 94 self.assertEqual(request.state, model.STATE_FAILED)
@@ -103,7 +103,7 @@ class TestNodepoolIntegration(BaseTestCase):
103 job = model.Job('testjob') 103 job = model.Job('testjob')
104 job.nodeset = nodeset 104 job.nodeset = nodeset
105 self.fake_nodepool.paused = True 105 self.fake_nodepool.paused = True
106 request = self.nodepool.requestNodes(None, job) 106 request = self.nodepool.requestNodes(None, job, 0)
107 self.zk.client.stop() 107 self.zk.client.stop()
108 self.zk.client.start() 108 self.zk.client.start()
109 self.fake_nodepool.paused = False 109 self.fake_nodepool.paused = False
@@ -121,7 +121,7 @@ class TestNodepoolIntegration(BaseTestCase):
121 job = model.Job('testjob') 121 job = model.Job('testjob')
122 job.nodeset = nodeset 122 job.nodeset = nodeset
123 self.fake_nodepool.paused = True 123 self.fake_nodepool.paused = True
124 request = self.nodepool.requestNodes(None, job) 124 request = self.nodepool.requestNodes(None, job, 0)
125 self.nodepool.cancelRequest(request) 125 self.nodepool.cancelRequest(request)
126 126
127 self.waitForRequests() 127 self.waitForRequests()
diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py
index aa0f082..e822a10 100644
--- a/tests/unit/test_nodepool.py
+++ b/tests/unit/test_nodepool.py
@@ -71,7 +71,7 @@ class TestNodepool(BaseTestCase):
71 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 71 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
72 job = model.Job('testjob') 72 job = model.Job('testjob')
73 job.nodeset = nodeset 73 job.nodeset = nodeset
74 request = self.nodepool.requestNodes(None, job) 74 request = self.nodepool.requestNodes(None, job, 0)
75 self.waitForRequests() 75 self.waitForRequests()
76 self.assertEqual(len(self.provisioned_requests), 1) 76 self.assertEqual(len(self.provisioned_requests), 1)
77 self.assertEqual(request.state, 'fulfilled') 77 self.assertEqual(request.state, 'fulfilled')
@@ -103,11 +103,11 @@ class TestNodepool(BaseTestCase):
103 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 103 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
104 job = model.Job('testjob') 104 job = model.Job('testjob')
105 job.nodeset = nodeset 105 job.nodeset = nodeset
106 self.fake_nodepool.paused = True 106 self.fake_nodepool.pause()
107 request = self.nodepool.requestNodes(None, job) 107 request = self.nodepool.requestNodes(None, job, 0)
108 self.zk.client.stop() 108 self.zk.client.stop()
109 self.zk.client.start() 109 self.zk.client.start()
110 self.fake_nodepool.paused = False 110 self.fake_nodepool.unpause()
111 self.waitForRequests() 111 self.waitForRequests()
112 self.assertEqual(len(self.provisioned_requests), 1) 112 self.assertEqual(len(self.provisioned_requests), 1)
113 self.assertEqual(request.state, 'fulfilled') 113 self.assertEqual(request.state, 'fulfilled')
@@ -120,8 +120,8 @@ class TestNodepool(BaseTestCase):
120 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 120 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
121 job = model.Job('testjob') 121 job = model.Job('testjob')
122 job.nodeset = nodeset 122 job.nodeset = nodeset
123 self.fake_nodepool.paused = True 123 self.fake_nodepool.pause()
124 request = self.nodepool.requestNodes(None, job) 124 request = self.nodepool.requestNodes(None, job, 0)
125 self.nodepool.cancelRequest(request) 125 self.nodepool.cancelRequest(request)
126 126
127 self.waitForRequests() 127 self.waitForRequests()
@@ -135,7 +135,7 @@ class TestNodepool(BaseTestCase):
135 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 135 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
136 job = model.Job('testjob') 136 job = model.Job('testjob')
137 job.nodeset = nodeset 137 job.nodeset = nodeset
138 request = self.nodepool.requestNodes(None, job) 138 request = self.nodepool.requestNodes(None, job, 0)
139 self.waitForRequests() 139 self.waitForRequests()
140 self.assertEqual(len(self.provisioned_requests), 1) 140 self.assertEqual(len(self.provisioned_requests), 1)
141 self.assertEqual(request.state, 'fulfilled') 141 self.assertEqual(request.state, 'fulfilled')
@@ -156,7 +156,7 @@ class TestNodepool(BaseTestCase):
156 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial')) 156 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
157 job = model.Job('testjob') 157 job = model.Job('testjob')
158 job.nodeset = nodeset 158 job.nodeset = nodeset
159 request = self.nodepool.requestNodes(None, job) 159 request = self.nodepool.requestNodes(None, job, 0)
160 self.waitForRequests() 160 self.waitForRequests()
161 self.assertEqual(len(self.provisioned_requests), 1) 161 self.assertEqual(len(self.provisioned_requests), 1)
162 self.assertEqual(request.state, 'fulfilled') 162 self.assertEqual(request.state, 'fulfilled')
@@ -170,3 +170,21 @@ class TestNodepool(BaseTestCase):
170 for node in nodeset.getNodes(): 170 for node in nodeset.getNodes():
171 self.assertIsNone(node.lock) 171 self.assertIsNone(node.lock)
172 self.assertEqual(node.state, 'ready') 172 self.assertEqual(node.state, 'ready')
173
174 def test_node_request_priority(self):
175 # Test that requests are satisfied in priority order
176
177 nodeset = model.NodeSet()
178 nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
179 nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
180 job = model.Job('testjob')
181 job.nodeset = nodeset
182 self.fake_nodepool.pause()
183 request1 = self.nodepool.requestNodes(None, job, 1)
184 request2 = self.nodepool.requestNodes(None, job, 0)
185 self.fake_nodepool.unpause()
186 self.waitForRequests()
187 self.assertEqual(len(self.provisioned_requests), 2)
188 self.assertEqual(request1.state, 'fulfilled')
189 self.assertEqual(request2.state, 'fulfilled')
190 self.assertTrue(request2.state_time < request1.state_time)
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index c4bd99d..7c0b448 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -4972,7 +4972,7 @@ For CI problems and help debugging, contact ci@example.org"""
4972 def test_zookeeper_disconnect(self): 4972 def test_zookeeper_disconnect(self):
4973 "Test that jobs are executed after a zookeeper disconnect" 4973 "Test that jobs are executed after a zookeeper disconnect"
4974 4974
4975 self.fake_nodepool.paused = True 4975 self.fake_nodepool.pause()
4976 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 4976 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
4977 A.addApproval('Code-Review', 2) 4977 A.addApproval('Code-Review', 2)
4978 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 4978 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -4980,7 +4980,7 @@ For CI problems and help debugging, contact ci@example.org"""
4980 4980
4981 self.zk.client.stop() 4981 self.zk.client.stop()
4982 self.zk.client.start() 4982 self.zk.client.start()
4983 self.fake_nodepool.paused = False 4983 self.fake_nodepool.unpause()
4984 self.waitUntilSettled() 4984 self.waitUntilSettled()
4985 4985
4986 self.assertEqual(A.data['status'], 'MERGED') 4986 self.assertEqual(A.data['status'], 'MERGED')
@@ -4991,7 +4991,7 @@ For CI problems and help debugging, contact ci@example.org"""
4991 4991
4992 # This tests receiving a ZK disconnect between the arrival of 4992 # This tests receiving a ZK disconnect between the arrival of
4993 # a fulfilled request and when we accept its nodes. 4993 # a fulfilled request and when we accept its nodes.
4994 self.fake_nodepool.paused = True 4994 self.fake_nodepool.pause()
4995 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 4995 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
4996 A.addApproval('Code-Review', 2) 4996 A.addApproval('Code-Review', 2)
4997 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 4997 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5003,7 +5003,7 @@ For CI problems and help debugging, contact ci@example.org"""
5003 self.sched.run_handler_lock.acquire() 5003 self.sched.run_handler_lock.acquire()
5004 5004
5005 # Fulfill the nodepool request. 5005 # Fulfill the nodepool request.
5006 self.fake_nodepool.paused = False 5006 self.fake_nodepool.unpause()
5007 requests = list(self.sched.nodepool.requests.values()) 5007 requests = list(self.sched.nodepool.requests.values())
5008 self.assertEqual(1, len(requests)) 5008 self.assertEqual(1, len(requests))
5009 request = requests[0] 5009 request = requests[0]
@@ -5037,7 +5037,7 @@ For CI problems and help debugging, contact ci@example.org"""
5037 def test_nodepool_failure(self): 5037 def test_nodepool_failure(self):
5038 "Test that jobs are reported after a nodepool failure" 5038 "Test that jobs are reported after a nodepool failure"
5039 5039
5040 self.fake_nodepool.paused = True 5040 self.fake_nodepool.pause()
5041 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5041 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5042 A.addApproval('Code-Review', 2) 5042 A.addApproval('Code-Review', 2)
5043 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 5043 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5046,7 +5046,7 @@ For CI problems and help debugging, contact ci@example.org"""
5046 req = self.fake_nodepool.getNodeRequests()[0] 5046 req = self.fake_nodepool.getNodeRequests()[0]
5047 self.fake_nodepool.addFailRequest(req) 5047 self.fake_nodepool.addFailRequest(req)
5048 5048
5049 self.fake_nodepool.paused = False 5049 self.fake_nodepool.unpause()
5050 self.waitUntilSettled() 5050 self.waitUntilSettled()
5051 5051
5052 self.assertEqual(A.data['status'], 'NEW') 5052 self.assertEqual(A.data['status'], 'NEW')
@@ -5055,10 +5055,10 @@ For CI problems and help debugging, contact ci@example.org"""
5055 self.assertIn('project-test1 : SKIPPED', A.messages[1]) 5055 self.assertIn('project-test1 : SKIPPED', A.messages[1])
5056 self.assertIn('project-test2 : SKIPPED', A.messages[1]) 5056 self.assertIn('project-test2 : SKIPPED', A.messages[1])
5057 5057
5058 def test_nodepool_priority(self): 5058 def test_nodepool_pipeline_priority(self):
5059 "Test that nodes are requested at the correct priority" 5059 "Test that nodes are requested at the correct pipeline priority"
5060 5060
5061 self.fake_nodepool.paused = True 5061 self.fake_nodepool.pause()
5062 5062
5063 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5063 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5064 self.fake_gerrit.addEvent(A.getRefUpdatedEvent()) 5064 self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
@@ -5075,10 +5075,11 @@ For CI problems and help debugging, contact ci@example.org"""
5075 5075
5076 reqs = self.fake_nodepool.getNodeRequests() 5076 reqs = self.fake_nodepool.getNodeRequests()
5077 5077
5078 # The requests come back sorted by oid. Since we have three requests 5078 # The requests come back sorted by priority. Since we have
5079 # for the three changes each with a different priority. 5079 # three requests for the three changes each with a different
5080 # Also they get a serial number based on order they were received 5080 # priority. Also they get a serial number based on order they
5081 # so the number on the endof the oid should map to order submitted. 5081 # were received so the number on the endof the oid should map
5082 # to order submitted.
5082 5083
5083 # * gate first - high priority - change C 5084 # * gate first - high priority - change C
5084 self.assertEqual(reqs[0]['_oid'], '100-0000000002') 5085 self.assertEqual(reqs[0]['_oid'], '100-0000000002')
@@ -5092,13 +5093,93 @@ For CI problems and help debugging, contact ci@example.org"""
5092 self.assertEqual(reqs[2]['_oid'], '300-0000000000') 5093 self.assertEqual(reqs[2]['_oid'], '300-0000000000')
5093 self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial']) 5094 self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
5094 5095
5095 self.fake_nodepool.paused = False 5096 self.fake_nodepool.unpause()
5097 self.waitUntilSettled()
5098
5099 def test_nodepool_relative_priority_check(self):
5100 "Test that nodes are requested at the relative priority"
5101
5102 self.fake_nodepool.pause()
5103
5104 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5105 self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
5106 self.waitUntilSettled()
5107
5108 B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
5109 self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
5110 self.waitUntilSettled()
5111
5112 C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C')
5113 self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
5114 self.waitUntilSettled()
5115
5116 reqs = self.fake_nodepool.getNodeRequests()
5117
5118 # The requests come back sorted by priority.
5119
5120 # Change A, first change for project, high relative priority.
5121 self.assertEqual(reqs[0]['_oid'], '200-0000000000')
5122 self.assertEqual(reqs[0]['relative_priority'], 0)
5123
5124 # Change C, first change for project1, high relative priority.
5125 self.assertEqual(reqs[1]['_oid'], '200-0000000002')
5126 self.assertEqual(reqs[1]['relative_priority'], 0)
5127
5128 # Change B, second change for project, lower relative priority.
5129 self.assertEqual(reqs[2]['_oid'], '200-0000000001')
5130 self.assertEqual(reqs[2]['relative_priority'], 1)
5131
5132 self.fake_nodepool.unpause()
5133 self.waitUntilSettled()
5134
5135 @simple_layout('layouts/two-projects-integrated.yaml')
5136 def test_nodepool_relative_priority_gate(self):
5137 "Test that nodes are requested at the relative priority"
5138
5139 self.fake_nodepool.pause()
5140
5141 A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
5142 A.addApproval('Code-Review', 2)
5143 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
5144 self.waitUntilSettled()
5145
5146 B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
5147 B.addApproval('Code-Review', 2)
5148 self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
5149 self.waitUntilSettled()
5150
5151 # project does not share a queue with project1 and project2.
5152 C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
5153 C.addApproval('Code-Review', 2)
5154 self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
5155 self.waitUntilSettled()
5156
5157 reqs = self.fake_nodepool.getNodeRequests()
5158
5159 # The requests come back sorted by priority.
5160
5161 # Change A, first change for shared queue, high relative
5162 # priority.
5163 self.assertEqual(reqs[0]['_oid'], '100-0000000000')
5164 self.assertEqual(reqs[0]['relative_priority'], 0)
5165
5166 # Change C, first change for independent project, high
5167 # relative priority.
5168 self.assertEqual(reqs[1]['_oid'], '100-0000000002')
5169 self.assertEqual(reqs[1]['relative_priority'], 0)
5170
5171 # Change B, second change for shared queue, lower relative
5172 # priority.
5173 self.assertEqual(reqs[2]['_oid'], '100-0000000001')
5174 self.assertEqual(reqs[2]['relative_priority'], 1)
5175
5176 self.fake_nodepool.unpause()
5096 self.waitUntilSettled() 5177 self.waitUntilSettled()
5097 5178
5098 def test_nodepool_job_removal(self): 5179 def test_nodepool_job_removal(self):
5099 "Test that nodes are returned unused after job removal" 5180 "Test that nodes are returned unused after job removal"
5100 5181
5101 self.fake_nodepool.paused = True 5182 self.fake_nodepool.pause()
5102 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') 5183 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
5103 A.addApproval('Code-Review', 2) 5184 A.addApproval('Code-Review', 2)
5104 self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) 5185 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@@ -5108,7 +5189,7 @@ For CI problems and help debugging, contact ci@example.org"""
5108 self.sched.reconfigure(self.config) 5189 self.sched.reconfigure(self.config)
5109 self.waitUntilSettled() 5190 self.waitUntilSettled()
5110 5191
5111 self.fake_nodepool.paused = False 5192 self.fake_nodepool.unpause()
5112 self.waitUntilSettled() 5193 self.waitUntilSettled()
5113 5194
5114 self.assertEqual(A.data['status'], 'MERGED') 5195 self.assertEqual(A.data['status'], 'MERGED')
diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py
index 728117b..b76dc8d 100644
--- a/zuul/manager/__init__.py
+++ b/zuul/manager/__init__.py
@@ -85,6 +85,11 @@ class PipelineManager(object):
85 return True 85 return True
86 return False 86 return False
87 87
88 def getNodePriority(self, item):
89 items = self.pipeline.getAllItems()
90 items = [i for i in items if i.change.project == item.change.project]
91 return items.index(item)
92
88 def isChangeAlreadyInPipeline(self, change): 93 def isChangeAlreadyInPipeline(self, change):
89 # Checks live items in the pipeline 94 # Checks live items in the pipeline
90 for item in self.pipeline.getAllItems(): 95 for item in self.pipeline.getAllItems():
@@ -327,8 +332,12 @@ class PipelineManager(object):
327 return False 332 return False
328 build_set = item.current_build_set 333 build_set = item.current_build_set
329 self.log.debug("Requesting nodes for change %s" % item.change) 334 self.log.debug("Requesting nodes for change %s" % item.change)
335 if self.sched.use_relative_priority:
336 priority = item.getNodePriority()
337 else:
338 priority = 0
330 for job in jobs: 339 for job in jobs:
331 req = self.sched.nodepool.requestNodes(build_set, job) 340 req = self.sched.nodepool.requestNodes(build_set, job, priority)
332 self.log.debug("Adding node request %s for job %s to item %s" % 341 self.log.debug("Adding node request %s for job %s to item %s" %
333 (req, job, item)) 342 (req, job, item))
334 build_set.setJobNodeRequest(job.name, req) 343 build_set.setJobNodeRequest(job.name, req)
@@ -687,6 +696,12 @@ class PipelineManager(object):
687 if failing_reasons: 696 if failing_reasons:
688 self.log.debug("%s is a failing item because %s" % 697 self.log.debug("%s is a failing item because %s" %
689 (item, failing_reasons)) 698 (item, failing_reasons))
699 if not dequeued and self.sched.use_relative_priority:
700 priority = item.getNodePriority()
701 for node_request in item.current_build_set.node_requests.values():
702 if node_request.relative_priority != priority:
703 self.sched.nodepool.reviseNodeRequest(
704 node_request, priority)
690 return (changed, nnfi) 705 return (changed, nnfi)
691 706
692 def processQueue(self): 707 def processQueue(self):
diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py
index f7a73de..3fcbafe 100644
--- a/zuul/manager/dependent.py
+++ b/zuul/manager/dependent.py
@@ -93,6 +93,11 @@ class DependentPipelineManager(PipelineManager):
93 self.log.debug("Dynamically created queue %s", change_queue) 93 self.log.debug("Dynamically created queue %s", change_queue)
94 return DynamicChangeQueueContextManager(change_queue) 94 return DynamicChangeQueueContextManager(change_queue)
95 95
96 def getNodePriority(self, item):
97 with self.getChangeQueue(item.change) as change_queue:
98 items = change_queue.queue
99 return items.index(item)
100
96 def isChangeReadyToBeEnqueued(self, change): 101 def isChangeReadyToBeEnqueued(self, change):
97 source = change.project.source 102 source = change.project.source
98 if not source.canMerge(change, self.getSubmitAllowNeeds()): 103 if not source.canMerge(change, self.getSubmitAllowNeeds()):
diff --git a/zuul/model.py b/zuul/model.py
index 5de9cd6..6362124 100644
--- a/zuul/model.py
+++ b/zuul/model.py
@@ -688,7 +688,7 @@ class NodeSet(ConfigObject):
688class NodeRequest(object): 688class NodeRequest(object):
689 """A request for a set of nodes.""" 689 """A request for a set of nodes."""
690 690
691 def __init__(self, requestor, build_set, job, nodeset): 691 def __init__(self, requestor, build_set, job, nodeset, relative_priority):
692 self.requestor = requestor 692 self.requestor = requestor
693 self.build_set = build_set 693 self.build_set = build_set
694 self.job = job 694 self.job = job
@@ -696,9 +696,12 @@ class NodeRequest(object):
696 self._state = STATE_REQUESTED 696 self._state = STATE_REQUESTED
697 self.requested_time = time.time() 697 self.requested_time = time.time()
698 self.state_time = time.time() 698 self.state_time = time.time()
699 self.created_time = None
699 self.stat = None 700 self.stat = None
700 self.uid = uuid4().hex 701 self.uid = uuid4().hex
702 self.relative_priority = relative_priority
701 self.id = None 703 self.id = None
704 self._zk_data = {} # Data that we read back from ZK
702 # Zuul internal flags (not stored in ZK so they are not 705 # Zuul internal flags (not stored in ZK so they are not
703 # overwritten). 706 # overwritten).
704 self.failed = False 707 self.failed = False
@@ -731,17 +734,24 @@ class NodeRequest(object):
731 return '<NodeRequest %s %s>' % (self.id, self.nodeset) 734 return '<NodeRequest %s %s>' % (self.id, self.nodeset)
732 735
733 def toDict(self): 736 def toDict(self):
734 d = {} 737 # Start with any previously read data
738 d = self._zk_data.copy()
735 nodes = [n.label for n in self.nodeset.getNodes()] 739 nodes = [n.label for n in self.nodeset.getNodes()]
736 d['node_types'] = nodes 740 # These are immutable once set
737 d['requestor'] = self.requestor 741 d.setdefault('node_types', nodes)
742 d.setdefault('requestor', self.requestor)
743 d.setdefault('created_time', self.created_time)
744 # We might change these
738 d['state'] = self.state 745 d['state'] = self.state
739 d['state_time'] = self.state_time 746 d['state_time'] = self.state_time
747 d['relative_priority'] = self.relative_priority
740 return d 748 return d
741 749
742 def updateFromDict(self, data): 750 def updateFromDict(self, data):
751 self._zk_data = data
743 self._state = data['state'] 752 self._state = data['state']
744 self.state_time = data['state_time'] 753 self.state_time = data['state_time']
754 self.relative_priority = data['relative_priority']
745 755
746 756
747class Secret(ConfigObject): 757class Secret(ConfigObject):
@@ -2245,6 +2255,9 @@ class QueueItem(object):
2245 fakebuild.result = 'SKIPPED' 2255 fakebuild.result = 'SKIPPED'
2246 self.addBuild(fakebuild) 2256 self.addBuild(fakebuild)
2247 2257
2258 def getNodePriority(self):
2259 return self.pipeline.manager.getNodePriority(self)
2260
2248 def formatUrlPattern(self, url_pattern, job=None, build=None): 2261 def formatUrlPattern(self, url_pattern, job=None, build=None):
2249 url = None 2262 url = None
2250 # Produce safe versions of objects which may be useful in 2263 # Produce safe versions of objects which may be useful in
diff --git a/zuul/nodepool.py b/zuul/nodepool.py
index 68f9629..56ee6c1 100644
--- a/zuul/nodepool.py
+++ b/zuul/nodepool.py
@@ -13,6 +13,7 @@
13import logging 13import logging
14 14
15from zuul import model 15from zuul import model
16from zuul.zk import LockException
16 17
17 18
18class Nodepool(object): 19class Nodepool(object):
@@ -51,11 +52,12 @@ class Nodepool(object):
51 statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt) 52 statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
52 statsd.gauge('zuul.nodepool.current_requests', len(self.requests)) 53 statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
53 54
54 def requestNodes(self, build_set, job): 55 def requestNodes(self, build_set, job, relative_priority):
55 # Create a copy of the nodeset to represent the actual nodes 56 # Create a copy of the nodeset to represent the actual nodes
56 # returned by nodepool. 57 # returned by nodepool.
57 nodeset = job.nodeset.copy() 58 nodeset = job.nodeset.copy()
58 req = model.NodeRequest(self.sched.hostname, build_set, job, nodeset) 59 req = model.NodeRequest(self.sched.hostname, build_set, job,
60 nodeset, relative_priority)
59 self.requests[req.uid] = req 61 self.requests[req.uid] = req
60 62
61 if nodeset.nodes: 63 if nodeset.nodes:
@@ -79,6 +81,38 @@ class Nodepool(object):
79 except Exception: 81 except Exception:
80 self.log.exception("Error deleting node request:") 82 self.log.exception("Error deleting node request:")
81 83
84 def reviseRequest(self, request, relative_priority=None):
85 '''Attempt to update the node request, if it is not currently being
86 processed.
87
88 :param: NodeRequest request: The request to update.
89 :param relative_priority int: If supplied, the new relative
90 priority to set on the request.
91
92 '''
93 if relative_priority is None:
94 return
95 try:
96 self.sched.zk.lockNodeRequest(request, blocking=False)
97 except LockException:
98 # It may be locked by nodepool, which is fine.
99 self.log.debug("Unable to revise locked node request %s", request)
100 return False
101 try:
102 old_priority = request.relative_priority
103 request.relative_priority = relative_priority
104 self.sched.zk.storeNodeRequest(request)
105 self.log.debug("Revised relative priority of "
106 "node request %s from %s to %s",
107 request, old_priority, relative_priority)
108 except Exception:
109 self.log.exception("Unable to update node request %s", request)
110 finally:
111 try:
112 self.sched.zk.unlockNodeRequest(request)
113 except Exception:
114 self.log.exception("Unable to unlock node request %s", request)
115
82 def holdNodeSet(self, nodeset, autohold_key): 116 def holdNodeSet(self, nodeset, autohold_key):
83 ''' 117 '''
84 Perform a hold on the given set of nodes. 118 Perform a hold on the given set of nodes.
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index cba94b9..61bedeb 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -305,6 +305,10 @@ class Scheduler(threading.Thread):
305 self.last_reconfigured = None 305 self.last_reconfigured = None
306 self.tenant_last_reconfigured = {} 306 self.tenant_last_reconfigured = {}
307 self.autohold_requests = {} 307 self.autohold_requests = {}
308 self.use_relative_priority = False
309 if self.config.has_option('scheduler', 'relative_priority'):
310 if self.config.getboolean('scheduler', 'relative_priority'):
311 self.use_relative_priority = True
308 312
309 def start(self): 313 def start(self):
310 super(Scheduler, self).start() 314 super(Scheduler, self).start()
diff --git a/zuul/zk.py b/zuul/zk.py
index 852df87..f1ff4c8 100644
--- a/zuul/zk.py
+++ b/zuul/zk.py
@@ -41,6 +41,7 @@ class ZooKeeper(object):
41 log = logging.getLogger("zuul.zk.ZooKeeper") 41 log = logging.getLogger("zuul.zk.ZooKeeper")
42 42
43 REQUEST_ROOT = '/nodepool/requests' 43 REQUEST_ROOT = '/nodepool/requests'
44 REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
44 NODE_ROOT = '/nodepool/nodes' 45 NODE_ROOT = '/nodepool/nodes'
45 46
46 # Log zookeeper retry every 10 seconds 47 # Log zookeeper retry every 10 seconds
@@ -162,8 +163,8 @@ class ZooKeeper(object):
162 from ZooKeeper). The watcher should return False when 163 from ZooKeeper). The watcher should return False when
163 further updates are no longer necessary. 164 further updates are no longer necessary.
164 ''' 165 '''
166 node_request.created_time = time.time()
165 data = node_request.toDict() 167 data = node_request.toDict()
166 data['created_time'] = time.time()
167 168
168 path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority) 169 path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
169 path = self.client.create(path, self._dictToStr(data), 170 path = self.client.create(path, self._dictToStr(data),
@@ -174,15 +175,7 @@ class ZooKeeper(object):
174 175
175 def callback(data, stat): 176 def callback(data, stat):
176 if data: 177 if data:
177 data = self._strToDict(data) 178 self.updateNodeRequest(node_request, data)
178 request_nodes = list(node_request.nodeset.getNodes())
179 for i, nodeid in enumerate(data.get('nodes', [])):
180 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
181 node_data, node_stat = self.client.get(node_path)
182 node_data = self._strToDict(node_data)
183 request_nodes[i].id = nodeid
184 request_nodes[i].updateFromDict(node_data)
185 node_request.updateFromDict(data)
186 deleted = (data is None) # data *are* none 179 deleted = (data is None) # data *are* none
187 return watcher(node_request, deleted) 180 return watcher(node_request, deleted)
188 181
@@ -215,6 +208,34 @@ class ZooKeeper(object):
215 return True 208 return True
216 return False 209 return False
217 210
211 def storeNodeRequest(self, node_request):
212 '''Store the node request.
213
214 The request is expected to already exist and is updated in its
215 entirety.
216
217 :param NodeRequest node_request: The request to update.
218 '''
219
220 path = '%s/%s' % (self.NODE_REQUEST_ROOT, node_request.id)
221 self.client.set(path, self._dictToStr(node_request.toDict()))
222
223 def updateNodeRequest(self, node_request, data=None):
224 '''Refresh an existing node request.
225
226 :param NodeRequest node_request: The request to update.
227 :param dict data: The data to use; query ZK if absent.
228 '''
229 if data is None:
230 path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
231 data, stat = self.client.get(path)
232 data = self._strToDict(data)
233 request_nodes = list(node_request.nodeset.getNodes())
234 for i, nodeid in enumerate(data.get('nodes', [])):
235 request_nodes[i].id = nodeid
236 self.updateNode(request_nodes[i], nodeid)
237 node_request.updateFromDict(data)
238
218 def storeNode(self, node): 239 def storeNode(self, node):
219 '''Store the node. 240 '''Store the node.
220 241
@@ -227,6 +248,18 @@ class ZooKeeper(object):
227 path = '%s/%s' % (self.NODE_ROOT, node.id) 248 path = '%s/%s' % (self.NODE_ROOT, node.id)
228 self.client.set(path, self._dictToStr(node.toDict())) 249 self.client.set(path, self._dictToStr(node.toDict()))
229 250
251 def updateNode(self, node, nodeid):
252 '''Refresh an existing node.
253
254 :param Node node: The node to update.
255 :param Node nodeid: The zookeeper node ID.
256 '''
257
258 node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
259 node_data, node_stat = self.client.get(node_path)
260 node_data = self._strToDict(node_data)
261 node.updateFromDict(node_data)
262
230 def lockNode(self, node, blocking=True, timeout=None): 263 def lockNode(self, node, blocking=True, timeout=None):
231 ''' 264 '''
232 Lock a node. 265 Lock a node.
@@ -268,6 +301,59 @@ class ZooKeeper(object):
268 node.lock.release() 301 node.lock.release()
269 node.lock = None 302 node.lock = None
270 303
304 def lockNodeRequest(self, request, blocking=True, timeout=None):
305 '''
306 Lock a node request.
307
308 This will set the `lock` attribute of the request object when the
309 lock is successfully acquired.
310
311 :param NodeRequest request: The request to lock.
312 :param bool blocking: Whether or not to block on trying to
313 acquire the lock
314 :param int timeout: When blocking, how long to wait for the lock
315 to get acquired. None, the default, waits forever.
316
317 :raises: TimeoutException if we failed to acquire the lock when
318 blocking with a timeout. ZKLockException if we are not blocking
319 and could not get the lock, or a lock is already held.
320 '''
321
322 path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id)
323 try:
324 lock = Lock(self.client, path)
325 have_lock = lock.acquire(blocking, timeout)
326 except kze.LockTimeout:
327 raise LockException(
328 "Timeout trying to acquire lock %s" % path)
329 except kze.NoNodeError:
330 have_lock = False
331 self.log.error("Request not found for locking: %s", request)
332
333 # If we aren't blocking, it's possible we didn't get the lock
334 # because someone else has it.
335 if not have_lock:
336 raise LockException("Did not get lock on %s" % path)
337
338 request.lock = lock
339 self.updateNodeRequest(request)
340
341 def unlockNodeRequest(self, request):
342 '''
343 Unlock a node request.
344
345 The request must already have been locked.
346
347 :param NodeRequest request: The request to unlock.
348
349 :raises: ZKLockException if the request is not currently locked.
350 '''
351 if request.lock is None:
352 raise LockException(
353 "Request %s does not hold a lock" % request)
354 request.lock.release()
355 request.lock = None
356
271 def heldNodeCount(self, autohold_key): 357 def heldNodeCount(self, autohold_key):
272 ''' 358 '''
273 Count the number of nodes being held for the given tenant/project/job. 359 Count the number of nodes being held for the given tenant/project/job.