summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRicardo Carrillo Cruz <ricardo.carrillo.cruz@gmail.com>2018-03-02 12:55:35 +0100
committerPaul Belanger <pabelanger@redhat.com>2018-11-29 13:05:20 -0500
commit090e9d8cd5104349a4e361b5f5d9e5eadc4397cd (patch)
tree4524a0a25750bf99cc6d66461b3832a527e98310
parent71f60674b91aef8c272d91d8903ad56042cba7cd (diff)
Add support for zones in executors
Create a new config setting to allow zuul executors to be grouped into zones. By default, this setting is disabled (set to None), to keep backwards compat. Story: 2001125 Task: 4817 Change-Id: I345ee2d0c004afa68858eb195189b56de3d41e97 Signed-off-by: Paul Belanger <pabelanger@redhat.com>
Notes
Notes (review): Code-Review+2: James E. Blair <corvus@inaugust.com> Code-Review+2: Tobias Henkel <tobias.henkel@bmw.de> Workflow+1: Tobias Henkel <tobias.henkel@bmw.de> Verified+2: Zuul Submitted-by: Zuul Submitted-at: Fri, 30 Nov 2018 06:00:10 +0000 Reviewed-on: https://review.openstack.org/549197 Project: openstack-infra/zuul Branch: refs/heads/master
-rw-r--r--doc/source/admin/components.rst19
-rw-r--r--releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml7
-rw-r--r--tests/base.py4
-rw-r--r--tests/unit/test_scheduler.py42
-rw-r--r--zuul/executor/client.py27
-rw-r--r--zuul/executor/server.py19
-rw-r--r--zuul/lib/gear_utils.py42
-rw-r--r--zuul/rpclistener.py23
-rw-r--r--zuul/scheduler.py5
9 files changed, 156 insertions, 32 deletions
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index bed1f84..617ef35 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -611,6 +611,25 @@ The following sections of ``zuul.conf`` are used by the executor:
611 where it cannot determine its hostname correctly this can be overridden 611 where it cannot determine its hostname correctly this can be overridden
612 here. 612 here.
613 613
614 .. attr:: zone
615 :default: None
616
617 Name of the nodepool executor-zone to exclusively execute all jobs that
618 have nodes of the specified provider. As an example, it is possible for
619 nodepool nodes to exist in a cloud with out public accessable IP
620 IP address. By adding an executor to a zone nodepool nodes could be
621 configured to use private ip addresses.
622
623 To enable this in nodepool, you'll use the node-attributes setting in a
624 provider pool. For example:
625
626 .. code-block:: yaml
627
628 pools:
629 - name: main
630 node-attributes:
631 executor-zone: vpn
632
614.. attr:: merger 633.. attr:: merger
615 634
616 .. attr:: git_user_email 635 .. attr:: git_user_email
diff --git a/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml b/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml
new file mode 100644
index 0000000..42db8cc
--- /dev/null
+++ b/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml
@@ -0,0 +1,7 @@
1---
2features:
3 - |
4 One or more zuul executors can now be added to a :attr:`executor.zone`.
5 This is helpful if a cloud does not have any public IP addresses for
6 nodepool nodes. Now you'll be able to have a zuul executor on the same
7 private network as your nodepool nodes.
diff --git a/tests/base.py b/tests/base.py
index 4aa3900..9fbd1a2 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1679,7 +1679,7 @@ class FakeGearmanServer(gear.Server):
1679 self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) 1679 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
1680 for job in self.getQueue(): 1680 for job in self.getQueue():
1681 match = False 1681 match = False
1682 if job.name == b'executor:execute': 1682 if job.name.startswith(b'executor:execute'):
1683 parameters = json.loads(job.arguments.decode('utf8')) 1683 parameters = json.loads(job.arguments.decode('utf8'))
1684 if not regex or re.match(regex, parameters.get('job')): 1684 if not regex or re.match(regex, parameters.get('job')):
1685 match = True 1685 match = True
@@ -1748,6 +1748,7 @@ class FakeNodepool(object):
1748 self.thread.start() 1748 self.thread.start()
1749 self.fail_requests = set() 1749 self.fail_requests = set()
1750 self.remote_ansible = False 1750 self.remote_ansible = False
1751 self.attributes = None
1751 1752
1752 def stop(self): 1753 def stop(self):
1753 self._running = False 1754 self._running = False
@@ -1820,6 +1821,7 @@ class FakeNodepool(object):
1820 provider='test-provider', 1821 provider='test-provider',
1821 region='test-region', 1822 region='test-region',
1822 az='test-az', 1823 az='test-az',
1824 attributes=self.attributes,
1823 interface_ip=remote_ip, 1825 interface_ip=remote_ip,
1824 public_ipv4=remote_ip, 1826 public_ipv4=remote_ip,
1825 private_ipv4=None, 1827 private_ipv4=None,
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index c4bd99d..dc34b0a 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -65,6 +65,48 @@ class TestSchedulerSSL(SSLZuulTestCase):
65 'label1') 65 'label1')
66 66
67 67
68class TestSchedulerZone(ZuulTestCase):
69 tenant_config_file = 'config/single-tenant/main.yaml'
70
71 def setUp(self):
72 super(TestSchedulerZone, self).setUp()
73 self.fake_nodepool.attributes = {'executor-zone': 'test-provider.vpn'}
74
75 def setup_config(self):
76 super(TestSchedulerZone, self).setup_config()
77 self.config.set('executor', 'zone', 'test-provider.vpn')
78
79 def test_jobs_executed(self):
80 "Test that jobs are executed and a change is merged per zone"
81 self.gearman_server.hold_jobs_in_queue = True
82 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
83 A.addApproval('Code-Review', 2)
84 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
85 self.waitUntilSettled()
86
87 queue = self.gearman_server.getQueue()
88 self.assertEqual(len(self.builds), 0)
89 self.assertEqual(len(queue), 1)
90 self.assertEqual(b'executor:execute:test-provider.vpn', queue[0].name)
91
92 self.gearman_server.hold_jobs_in_queue = False
93 self.gearman_server.release()
94 self.waitUntilSettled()
95
96 self.assertEqual(self.getJobFromHistory('project-merge').result,
97 'SUCCESS')
98 self.assertEqual(self.getJobFromHistory('project-test1').result,
99 'SUCCESS')
100 self.assertEqual(self.getJobFromHistory('project-test2').result,
101 'SUCCESS')
102 self.assertEqual(A.data['status'], 'MERGED')
103 self.assertEqual(A.reported, 2)
104 self.assertEqual(self.getJobFromHistory('project-test1').node,
105 'label1')
106 self.assertEqual(self.getJobFromHistory('project-test2').node,
107 'label1')
108
109
68class TestScheduler(ZuulTestCase): 110class TestScheduler(ZuulTestCase):
69 tenant_config_file = 'config/single-tenant/main.yaml' 111 tenant_config_file = 'config/single-tenant/main.yaml'
70 112
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index a1d251d..db1ef48 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -22,6 +22,7 @@ from uuid import uuid4
22 22
23import zuul.model 23import zuul.model
24from zuul.lib.config import get_default 24from zuul.lib.config import get_default
25from zuul.lib.gear_utils import getGearmanFunctions
25from zuul.lib.jsonutil import json_dumps 26from zuul.lib.jsonutil import json_dumps
26from zuul.model import Build 27from zuul.model import Build
27 28
@@ -306,8 +307,30 @@ class ExecutorClient(object):
306 self.sched.onBuildCompleted(build, 'SUCCESS', {}, []) 307 self.sched.onBuildCompleted(build, 'SUCCESS', {}, [])
307 return build 308 return build
308 309
309 gearman_job = gear.TextJob('executor:execute', json_dumps(params), 310 functions = getGearmanFunctions(self.gearman)
310 unique=uuid) 311 function_name = 'executor:execute'
312 # Because all nodes belong to the same provider, region and
313 # availability zone we can get executor_zone from only the first
314 # node.
315 executor_zone = None
316 if nodes and nodes[0].get('attributes'):
317 executor_zone = nodes[0]['attributes'].get('executor-zone')
318
319 if executor_zone:
320 _fname = '%s:%s' % (
321 function_name,
322 executor_zone)
323 if _fname in functions.keys():
324 function_name = _fname
325 else:
326 self.log.warning(
327 "Job requested '%s' zuul-executor zone, but no "
328 "zuul-executors found for this zone; ignoring zone "
329 "request" % executor_zone)
330
331 gearman_job = gear.TextJob(
332 function_name, json_dumps(params), unique=uuid)
333
311 build.__gearman_job = gearman_job 334 build.__gearman_job = gearman_job
312 build.__gearman_worker = None 335 build.__gearman_worker = None
313 self.builds[uuid] = build 336 self.builds[uuid] = build
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 8e243d6..a6e4d36 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -2033,6 +2033,7 @@ class ExecutorServer(object):
2033 'default_username', 'zuul') 2033 'default_username', 'zuul')
2034 self.disk_limit_per_job = int(get_default(self.config, 'executor', 2034 self.disk_limit_per_job = int(get_default(self.config, 'executor',
2035 'disk_limit_per_job', 250)) 2035 'disk_limit_per_job', 250))
2036 self.zone = get_default(self.config, 'executor', 'zone')
2036 self.merge_email = get_default(self.config, 'merger', 'git_user_email') 2037 self.merge_email = get_default(self.config, 'merger', 'git_user_email')
2037 self.merge_name = get_default(self.config, 'merger', 'git_user_name') 2038 self.merge_name = get_default(self.config, 'merger', 'git_user_name')
2038 self.merge_speed_limit = get_default( 2039 self.merge_speed_limit = get_default(
@@ -2177,7 +2178,10 @@ class ExecutorServer(object):
2177 def register_work(self): 2178 def register_work(self):
2178 if self._running: 2179 if self._running:
2179 self.accepting_work = True 2180 self.accepting_work = True
2180 self.executor_worker.registerFunction("executor:execute") 2181 function_name = 'executor:execute'
2182 if self.zone:
2183 function_name += ':%s' % self.zone
2184 self.executor_worker.registerFunction(function_name)
2181 # TODO(jeblair): Update geard to send a noop after 2185 # TODO(jeblair): Update geard to send a noop after
2182 # registering for a job which is in the queue, then remove 2186 # registering for a job which is in the queue, then remove
2183 # this API violation. 2187 # this API violation.
@@ -2185,7 +2189,10 @@ class ExecutorServer(object):
2185 2189
2186 def unregister_work(self): 2190 def unregister_work(self):
2187 self.accepting_work = False 2191 self.accepting_work = False
2188 self.executor_worker.unRegisterFunction("executor:execute") 2192 function_name = 'executor:execute'
2193 if self.zone:
2194 function_name += ':%s' % self.zone
2195 self.executor_worker.unRegisterFunction(function_name)
2189 2196
2190 def stop(self): 2197 def stop(self):
2191 self.log.debug("Stopping") 2198 self.log.debug("Stopping")
@@ -2362,8 +2369,12 @@ class ExecutorServer(object):
2362 if not self._running: 2369 if not self._running:
2363 job.sendWorkFail() 2370 job.sendWorkFail()
2364 return 2371 return
2365 if job.name == 'executor:execute': 2372 function_name = 'executor:execute'
2366 self.log.debug("Got execute job: %s" % job.unique) 2373 if self.zone:
2374 function_name += ':%s' % self.zone
2375 if job.name == (function_name):
2376 self.log.debug("Got %s job: %s" %
2377 (function_name, job.unique))
2367 self.executeJob(job) 2378 self.executeJob(job)
2368 elif job.name.startswith('executor:resume'): 2379 elif job.name.startswith('executor:resume'):
2369 self.log.debug("Got resume job: %s" % job.unique) 2380 self.log.debug("Got resume job: %s" % job.unique)
diff --git a/zuul/lib/gear_utils.py b/zuul/lib/gear_utils.py
new file mode 100644
index 0000000..02df72d
--- /dev/null
+++ b/zuul/lib/gear_utils.py
@@ -0,0 +1,42 @@
1# Copyright 2018 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import gear
16import logging
17
18log = logging.getLogger("zuul.gear_utils")
19
20
21def getGearmanFunctions(gearman):
22 functions = {}
23 for connection in gearman.active_connections:
24 try:
25 req = gear.StatusAdminRequest()
26 connection.sendAdminRequest(req, timeout=300)
27 except Exception:
28 log.exception("Exception while listing functions")
29 gearman._lostConnection(connection)
30 continue
31 for line in req.response.decode('utf8').split('\n'):
32 parts = [x.strip() for x in line.split('\t')]
33 if len(parts) < 4:
34 continue
35 # parts[0] - function name
36 # parts[1] - total jobs queued (including building)
37 # parts[2] - jobs building
38 # parts[3] - workers registered
39 data = functions.setdefault(parts[0], [0, 0, 0])
40 for i in range(3):
41 data[i] += int(parts[i + 1])
42 return functions
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 5380e93..384b9c7 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -77,29 +77,6 @@ class RPCListener(object):
77 self.worker.registerFunction("zuul:key_get") 77 self.worker.registerFunction("zuul:key_get")
78 self.worker.registerFunction("zuul:config_errors_list") 78 self.worker.registerFunction("zuul:config_errors_list")
79 79
80 def getFunctions(self):
81 functions = {}
82 for connection in self.worker.active_connections:
83 try:
84 req = gear.StatusAdminRequest()
85 connection.sendAdminRequest(req, timeout=300)
86 except Exception:
87 self.log.exception("Exception while listing functions")
88 self.worker._lostConnection(connection)
89 continue
90 for line in req.response.decode('utf8').split('\n'):
91 parts = [x.strip() for x in line.split('\t')]
92 if len(parts) < 4:
93 continue
94 # parts[0] - function name
95 # parts[1] - total jobs queued (including building)
96 # parts[2] - jobs building
97 # parts[3] - workers registered
98 data = functions.setdefault(parts[0], [0, 0, 0])
99 for i in range(3):
100 data[i] += int(parts[i + 1])
101 return functions
102
103 def stop(self): 80 def stop(self):
104 self.log.debug("Stopping") 81 self.log.debug("Stopping")
105 self._running = False 82 self._running = False
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index cba94b9..3f501fd 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -33,6 +33,7 @@ from zuul import version as zuul_version
33from zuul import rpclistener 33from zuul import rpclistener
34from zuul.lib import commandsocket 34from zuul.lib import commandsocket
35from zuul.lib.config import get_default 35from zuul.lib.config import get_default
36from zuul.lib.gear_utils import getGearmanFunctions
36from zuul.lib.statsd import get_statsd 37from zuul.lib.statsd import get_statsd
37import zuul.lib.queue 38import zuul.lib.queue
38 39
@@ -373,7 +374,7 @@ class Scheduler(threading.Thread):
373 def _runStats(self): 374 def _runStats(self):
374 if not self.statsd: 375 if not self.statsd:
375 return 376 return
376 functions = self.rpc.getFunctions() 377 functions = getGearmanFunctions(self.rpc.worker)
377 executors_accepting = 0 378 executors_accepting = 0
378 executors_online = 0 379 executors_online = 0
379 execute_queue = 0 380 execute_queue = 0
@@ -382,7 +383,7 @@ class Scheduler(threading.Thread):
382 merge_queue = 0 383 merge_queue = 0
383 merge_running = 0 384 merge_running = 0
384 for (name, (queued, running, registered)) in functions.items(): 385 for (name, (queued, running, registered)) in functions.items():
385 if name == 'executor:execute': 386 if name.startswith('executor:execute'):
386 executors_accepting = registered 387 executors_accepting = registered
387 execute_queue = queued - running 388 execute_queue = queued - running
388 execute_running = running 389 execute_running = running