summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-11-30 06:00:10 +0000
committerGerrit Code Review <review@openstack.org>2018-11-30 06:00:10 +0000
commita012cf3da32197878da9709a6150554d171bb0be (patch)
treefaa83439ff2b21c7538cba2c75f8d0fafc042b97
parentfff02ace694de41da98240083588e02d4670d7e9 (diff)
parent090e9d8cd5104349a4e361b5f5d9e5eadc4397cd (diff)
Merge "Add support for zones in executors"
-rw-r--r--doc/source/admin/components.rst19
-rw-r--r--releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml7
-rw-r--r--tests/base.py4
-rw-r--r--tests/unit/test_scheduler.py42
-rw-r--r--zuul/executor/client.py27
-rw-r--r--zuul/executor/server.py19
-rw-r--r--zuul/lib/gear_utils.py42
-rw-r--r--zuul/rpclistener.py23
-rw-r--r--zuul/scheduler.py5
9 files changed, 156 insertions, 32 deletions
diff --git a/doc/source/admin/components.rst b/doc/source/admin/components.rst
index 1129572..81a20cb 100644
--- a/doc/source/admin/components.rst
+++ b/doc/source/admin/components.rst
@@ -622,6 +622,25 @@ The following sections of ``zuul.conf`` are used by the executor:
622 where it cannot determine its hostname correctly this can be overridden 622 where it cannot determine its hostname correctly this can be overridden
623 here. 623 here.
624 624
625 .. attr:: zone
626 :default: None
627
628 Name of the nodepool executor-zone to exclusively execute all jobs that
629 have nodes of the specified provider. As an example, it is possible for
630 nodepool nodes to exist in a cloud with out public accessable IP
631 IP address. By adding an executor to a zone nodepool nodes could be
632 configured to use private ip addresses.
633
634 To enable this in nodepool, you'll use the node-attributes setting in a
635 provider pool. For example:
636
637 .. code-block:: yaml
638
639 pools:
640 - name: main
641 node-attributes:
642 executor-zone: vpn
643
625.. attr:: merger 644.. attr:: merger
626 645
627 .. attr:: git_user_email 646 .. attr:: git_user_email
diff --git a/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml b/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml
new file mode 100644
index 0000000..42db8cc
--- /dev/null
+++ b/releasenotes/notes/executor-zones-54318b8ea2f7e195.yaml
@@ -0,0 +1,7 @@
1---
2features:
3 - |
4 One or more zuul executors can now be added to a :attr:`executor.zone`.
5 This is helpful if a cloud does not have any public IP addresses for
6 nodepool nodes. Now you'll be able to have a zuul executor on the same
7 private network as your nodepool nodes.
diff --git a/tests/base.py b/tests/base.py
index 2c6e675..7cc2239 100644
--- a/tests/base.py
+++ b/tests/base.py
@@ -1679,7 +1679,7 @@ class FakeGearmanServer(gear.Server):
1679 self.log.debug("releasing queued job %s (%s)" % (regex, qlen)) 1679 self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
1680 for job in self.getQueue(): 1680 for job in self.getQueue():
1681 match = False 1681 match = False
1682 if job.name == b'executor:execute': 1682 if job.name.startswith(b'executor:execute'):
1683 parameters = json.loads(job.arguments.decode('utf8')) 1683 parameters = json.loads(job.arguments.decode('utf8'))
1684 if not regex or re.match(regex, parameters.get('job')): 1684 if not regex or re.match(regex, parameters.get('job')):
1685 match = True 1685 match = True
@@ -1748,6 +1748,7 @@ class FakeNodepool(object):
1748 self.thread.start() 1748 self.thread.start()
1749 self.fail_requests = set() 1749 self.fail_requests = set()
1750 self.remote_ansible = False 1750 self.remote_ansible = False
1751 self.attributes = None
1751 1752
1752 def stop(self): 1753 def stop(self):
1753 self._running = False 1754 self._running = False
@@ -1820,6 +1821,7 @@ class FakeNodepool(object):
1820 provider='test-provider', 1821 provider='test-provider',
1821 region='test-region', 1822 region='test-region',
1822 az='test-az', 1823 az='test-az',
1824 attributes=self.attributes,
1823 interface_ip=remote_ip, 1825 interface_ip=remote_ip,
1824 public_ipv4=remote_ip, 1826 public_ipv4=remote_ip,
1825 private_ipv4=None, 1827 private_ipv4=None,
diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py
index d47b408..ba7be3d 100644
--- a/tests/unit/test_scheduler.py
+++ b/tests/unit/test_scheduler.py
@@ -65,6 +65,48 @@ class TestSchedulerSSL(SSLZuulTestCase):
65 'label1') 65 'label1')
66 66
67 67
68class TestSchedulerZone(ZuulTestCase):
69 tenant_config_file = 'config/single-tenant/main.yaml'
70
71 def setUp(self):
72 super(TestSchedulerZone, self).setUp()
73 self.fake_nodepool.attributes = {'executor-zone': 'test-provider.vpn'}
74
75 def setup_config(self):
76 super(TestSchedulerZone, self).setup_config()
77 self.config.set('executor', 'zone', 'test-provider.vpn')
78
79 def test_jobs_executed(self):
80 "Test that jobs are executed and a change is merged per zone"
81 self.gearman_server.hold_jobs_in_queue = True
82 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
83 A.addApproval('Code-Review', 2)
84 self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
85 self.waitUntilSettled()
86
87 queue = self.gearman_server.getQueue()
88 self.assertEqual(len(self.builds), 0)
89 self.assertEqual(len(queue), 1)
90 self.assertEqual(b'executor:execute:test-provider.vpn', queue[0].name)
91
92 self.gearman_server.hold_jobs_in_queue = False
93 self.gearman_server.release()
94 self.waitUntilSettled()
95
96 self.assertEqual(self.getJobFromHistory('project-merge').result,
97 'SUCCESS')
98 self.assertEqual(self.getJobFromHistory('project-test1').result,
99 'SUCCESS')
100 self.assertEqual(self.getJobFromHistory('project-test2').result,
101 'SUCCESS')
102 self.assertEqual(A.data['status'], 'MERGED')
103 self.assertEqual(A.reported, 2)
104 self.assertEqual(self.getJobFromHistory('project-test1').node,
105 'label1')
106 self.assertEqual(self.getJobFromHistory('project-test2').node,
107 'label1')
108
109
68class TestScheduler(ZuulTestCase): 110class TestScheduler(ZuulTestCase):
69 tenant_config_file = 'config/single-tenant/main.yaml' 111 tenant_config_file = 'config/single-tenant/main.yaml'
70 112
diff --git a/zuul/executor/client.py b/zuul/executor/client.py
index a1d251d..db1ef48 100644
--- a/zuul/executor/client.py
+++ b/zuul/executor/client.py
@@ -22,6 +22,7 @@ from uuid import uuid4
22 22
23import zuul.model 23import zuul.model
24from zuul.lib.config import get_default 24from zuul.lib.config import get_default
25from zuul.lib.gear_utils import getGearmanFunctions
25from zuul.lib.jsonutil import json_dumps 26from zuul.lib.jsonutil import json_dumps
26from zuul.model import Build 27from zuul.model import Build
27 28
@@ -306,8 +307,30 @@ class ExecutorClient(object):
306 self.sched.onBuildCompleted(build, 'SUCCESS', {}, []) 307 self.sched.onBuildCompleted(build, 'SUCCESS', {}, [])
307 return build 308 return build
308 309
309 gearman_job = gear.TextJob('executor:execute', json_dumps(params), 310 functions = getGearmanFunctions(self.gearman)
310 unique=uuid) 311 function_name = 'executor:execute'
312 # Because all nodes belong to the same provider, region and
313 # availability zone we can get executor_zone from only the first
314 # node.
315 executor_zone = None
316 if nodes and nodes[0].get('attributes'):
317 executor_zone = nodes[0]['attributes'].get('executor-zone')
318
319 if executor_zone:
320 _fname = '%s:%s' % (
321 function_name,
322 executor_zone)
323 if _fname in functions.keys():
324 function_name = _fname
325 else:
326 self.log.warning(
327 "Job requested '%s' zuul-executor zone, but no "
328 "zuul-executors found for this zone; ignoring zone "
329 "request" % executor_zone)
330
331 gearman_job = gear.TextJob(
332 function_name, json_dumps(params), unique=uuid)
333
311 build.__gearman_job = gearman_job 334 build.__gearman_job = gearman_job
312 build.__gearman_worker = None 335 build.__gearman_worker = None
313 self.builds[uuid] = build 336 self.builds[uuid] = build
diff --git a/zuul/executor/server.py b/zuul/executor/server.py
index 2f42c0f..0c9e912 100644
--- a/zuul/executor/server.py
+++ b/zuul/executor/server.py
@@ -2049,6 +2049,7 @@ class ExecutorServer(object):
2049 'default_username', 'zuul') 2049 'default_username', 'zuul')
2050 self.disk_limit_per_job = int(get_default(self.config, 'executor', 2050 self.disk_limit_per_job = int(get_default(self.config, 'executor',
2051 'disk_limit_per_job', 250)) 2051 'disk_limit_per_job', 250))
2052 self.zone = get_default(self.config, 'executor', 'zone')
2052 self.merge_email = get_default(self.config, 'merger', 'git_user_email') 2053 self.merge_email = get_default(self.config, 'merger', 'git_user_email')
2053 self.merge_name = get_default(self.config, 'merger', 'git_user_name') 2054 self.merge_name = get_default(self.config, 'merger', 'git_user_name')
2054 self.merge_speed_limit = get_default( 2055 self.merge_speed_limit = get_default(
@@ -2193,7 +2194,10 @@ class ExecutorServer(object):
2193 def register_work(self): 2194 def register_work(self):
2194 if self._running: 2195 if self._running:
2195 self.accepting_work = True 2196 self.accepting_work = True
2196 self.executor_worker.registerFunction("executor:execute") 2197 function_name = 'executor:execute'
2198 if self.zone:
2199 function_name += ':%s' % self.zone
2200 self.executor_worker.registerFunction(function_name)
2197 # TODO(jeblair): Update geard to send a noop after 2201 # TODO(jeblair): Update geard to send a noop after
2198 # registering for a job which is in the queue, then remove 2202 # registering for a job which is in the queue, then remove
2199 # this API violation. 2203 # this API violation.
@@ -2201,7 +2205,10 @@ class ExecutorServer(object):
2201 2205
2202 def unregister_work(self): 2206 def unregister_work(self):
2203 self.accepting_work = False 2207 self.accepting_work = False
2204 self.executor_worker.unRegisterFunction("executor:execute") 2208 function_name = 'executor:execute'
2209 if self.zone:
2210 function_name += ':%s' % self.zone
2211 self.executor_worker.unRegisterFunction(function_name)
2205 2212
2206 def stop(self): 2213 def stop(self):
2207 self.log.debug("Stopping") 2214 self.log.debug("Stopping")
@@ -2378,8 +2385,12 @@ class ExecutorServer(object):
2378 if not self._running: 2385 if not self._running:
2379 job.sendWorkFail() 2386 job.sendWorkFail()
2380 return 2387 return
2381 if job.name == 'executor:execute': 2388 function_name = 'executor:execute'
2382 self.log.debug("Got execute job: %s" % job.unique) 2389 if self.zone:
2390 function_name += ':%s' % self.zone
2391 if job.name == (function_name):
2392 self.log.debug("Got %s job: %s" %
2393 (function_name, job.unique))
2383 self.executeJob(job) 2394 self.executeJob(job)
2384 elif job.name.startswith('executor:resume'): 2395 elif job.name.startswith('executor:resume'):
2385 self.log.debug("Got resume job: %s" % job.unique) 2396 self.log.debug("Got resume job: %s" % job.unique)
diff --git a/zuul/lib/gear_utils.py b/zuul/lib/gear_utils.py
new file mode 100644
index 0000000..02df72d
--- /dev/null
+++ b/zuul/lib/gear_utils.py
@@ -0,0 +1,42 @@
1# Copyright 2018 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import gear
16import logging
17
18log = logging.getLogger("zuul.gear_utils")
19
20
21def getGearmanFunctions(gearman):
22 functions = {}
23 for connection in gearman.active_connections:
24 try:
25 req = gear.StatusAdminRequest()
26 connection.sendAdminRequest(req, timeout=300)
27 except Exception:
28 log.exception("Exception while listing functions")
29 gearman._lostConnection(connection)
30 continue
31 for line in req.response.decode('utf8').split('\n'):
32 parts = [x.strip() for x in line.split('\t')]
33 if len(parts) < 4:
34 continue
35 # parts[0] - function name
36 # parts[1] - total jobs queued (including building)
37 # parts[2] - jobs building
38 # parts[3] - workers registered
39 data = functions.setdefault(parts[0], [0, 0, 0])
40 for i in range(3):
41 data[i] += int(parts[i + 1])
42 return functions
diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py
index 5380e93..384b9c7 100644
--- a/zuul/rpclistener.py
+++ b/zuul/rpclistener.py
@@ -77,29 +77,6 @@ class RPCListener(object):
77 self.worker.registerFunction("zuul:key_get") 77 self.worker.registerFunction("zuul:key_get")
78 self.worker.registerFunction("zuul:config_errors_list") 78 self.worker.registerFunction("zuul:config_errors_list")
79 79
80 def getFunctions(self):
81 functions = {}
82 for connection in self.worker.active_connections:
83 try:
84 req = gear.StatusAdminRequest()
85 connection.sendAdminRequest(req, timeout=300)
86 except Exception:
87 self.log.exception("Exception while listing functions")
88 self.worker._lostConnection(connection)
89 continue
90 for line in req.response.decode('utf8').split('\n'):
91 parts = [x.strip() for x in line.split('\t')]
92 if len(parts) < 4:
93 continue
94 # parts[0] - function name
95 # parts[1] - total jobs queued (including building)
96 # parts[2] - jobs building
97 # parts[3] - workers registered
98 data = functions.setdefault(parts[0], [0, 0, 0])
99 for i in range(3):
100 data[i] += int(parts[i + 1])
101 return functions
102
103 def stop(self): 80 def stop(self):
104 self.log.debug("Stopping") 81 self.log.debug("Stopping")
105 self._running = False 82 self._running = False
diff --git a/zuul/scheduler.py b/zuul/scheduler.py
index cba94b9..3f501fd 100644
--- a/zuul/scheduler.py
+++ b/zuul/scheduler.py
@@ -33,6 +33,7 @@ from zuul import version as zuul_version
33from zuul import rpclistener 33from zuul import rpclistener
34from zuul.lib import commandsocket 34from zuul.lib import commandsocket
35from zuul.lib.config import get_default 35from zuul.lib.config import get_default
36from zuul.lib.gear_utils import getGearmanFunctions
36from zuul.lib.statsd import get_statsd 37from zuul.lib.statsd import get_statsd
37import zuul.lib.queue 38import zuul.lib.queue
38 39
@@ -373,7 +374,7 @@ class Scheduler(threading.Thread):
373 def _runStats(self): 374 def _runStats(self):
374 if not self.statsd: 375 if not self.statsd:
375 return 376 return
376 functions = self.rpc.getFunctions() 377 functions = getGearmanFunctions(self.rpc.worker)
377 executors_accepting = 0 378 executors_accepting = 0
378 executors_online = 0 379 executors_online = 0
379 execute_queue = 0 380 execute_queue = 0
@@ -382,7 +383,7 @@ class Scheduler(threading.Thread):
382 merge_queue = 0 383 merge_queue = 0
383 merge_running = 0 384 merge_running = 0
384 for (name, (queued, running, registered)) in functions.items(): 385 for (name, (queued, running, registered)) in functions.items():
385 if name == 'executor:execute': 386 if name.startswith('executor:execute'):
386 executors_accepting = registered 387 executors_accepting = registered
387 execute_queue = queued - running 388 execute_queue = queued - running
388 execute_running = running 389 execute_running = running