Rename zuul-launcher to zuul-executor

To avoid confusion with nodepool-launcher, we've decided to rename
zuul-launcher to zuul-executor.

Change-Id: I7d03cf0f0093400f4ba2e4beb1c92694224a3e8c
Signed-off-by: Paul Belanger <pabelanger@redhat.com>
This commit is contained in:
Paul Belanger 2017-03-14 13:20:10 -04:00
parent df8b742356
commit 174a8274d0
34 changed files with 492 additions and 491 deletions

View File

@ -118,7 +118,7 @@ the following:
Construct a test to fully simulate the series of events you want to
see, then run it in the foreground. For example::
.tox/py27/bin/python -m testtools.run tests.test_scheduler.TestScheduler.test_jobs_launched
.tox/py27/bin/python -m testtools.run tests.test_scheduler.TestScheduler.test_jobs_executed
See TESTING.rst for more information.

View File

@ -64,12 +64,12 @@ To run individual tests with tox::
For example, to *run the basic Zuul test*::
tox -e py27 -- tests.unit.test_scheduler.TestScheduler.test_jobs_launched
tox -e py27 -- tests.unit.test_scheduler.TestScheduler.test_jobs_executed
To *run one test in the foreground* (after previously having run tox
to set up the virtualenv)::
.tox/py27/bin/python -m testtools.run tests.unit.test_scheduler.TestScheduler.test_jobs_launched
.tox/py27/bin/python -m testtools.run tests.unit.test_scheduler.TestScheduler.test_jobs_executed
List Failing Tests
------------------

View File

@ -1,4 +1,4 @@
:title: Launchers
:title: Executors
.. _Gearman: http://gearman.org/
@ -11,27 +11,27 @@
.. _`Turbo-Hipster Documentation`:
http://turbo-hipster.rtfd.org/
.. _launchers:
.. _executors:
Launchers
Executors
=========
Zuul has a modular architecture for launching jobs. Currently, the
Zuul has a modular architecture for executing jobs. Currently, the
only supported module interfaces with Gearman_. This design allows
any system to run jobs for Zuul simply by interfacing with a Gearman
server. The recommended way of integrating a new job-runner with Zuul
is via this method.
If Gearman is unsuitable, Zuul may be extended with a new launcher
If Gearman is unsuitable, Zuul may be extended with a new executor
module. Zuul makes very few assumptions about the interface to a
launcher -- if it can trigger jobs, cancel them, and receive success
executor -- if it can trigger jobs, cancel them, and receive success
or failure reports, it should be able to be used with Zuul. Patches
to this effect are welcome.
Zuul Parameters
---------------
Zuul will pass some parameters with every job it launches. These are
Zuul will pass some parameters with every job it executes. These are
for workers to be able to get the repositories into the state they are
intended to be tested in. Builds can be triggered either by an action
on a change or by a reference update. Both events share a common set

View File

@ -21,7 +21,7 @@ Contents:
zuul
merger
cloner
launchers
executors
statsd
client
developer

View File

@ -30,7 +30,7 @@ cherry-picking changes as required and identifies the result with a
Git reference of the form ``refs/zuul/<branch>/Z<random sha1>``.
Preparing the workspace is then a simple matter of fetching that ref
and checking it out. The parameters that provide this information are
described in :ref:`launchers`.
described in :ref:`executors`.
These references need to be made available via a Git repository that
is available to workers (such as Jenkins). This is accomplished by

View File

@ -19,7 +19,7 @@ Zuul Components
Zuul provides the following components:
- **zuul-server**: scheduler daemon which communicates with Gerrit and
Gearman. Handles receiving events, launching jobs, collecting results
Gearman. Handles receiving events, executing jobs, collecting results
and postingreports.
- **zuul-merger**: speculative-merger which communicates with Gearman.
Prepares Git repositories for jobs to test against. This additionally

View File

@ -17,7 +17,7 @@ the statsd python module, so an existing Zuul installation may be missing it.
The configuration is done via environment variables STATSD_HOST and
STATSD_PORT. They are interpreted by the statsd module directly and there is no
such parameter in zuul.conf yet. Your init script will have to initialize both
of them before launching Zuul.
of them before executing Zuul.
Your init script most probably loads a configuration file named
``/etc/default/zuul`` which would contain the environment variables::
@ -61,7 +61,7 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
#. **job.<jobname>** subtree detailing per job statistics:
#. **wait_time** counter and timer of the wait time, with the
difference of the job start time and the launch time, in
difference of the job start time and the execute time, in
milliseconds.
**zuul.pipeline.**
@ -88,7 +88,7 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
#. **total_changes** counter of the number of change proceeding since
Zuul started.
#. **wait_time** counter and timer of the wait time, with the difference
of the job start time and the launch time, in milliseconds.
of the job start time and the execute time, in milliseconds.
Additionally, the `zuul.pipeline.<pipeline name>` hierarchy contains
`current_changes` (gauge), `resident_time` (timing) and `total_changes`

View File

@ -635,9 +635,9 @@ each job as it builds a list from the project specification.
**hold-following-changes (optional)**
This is a boolean that indicates that changes that follow this
change in a dependent change pipeline should wait until this job
succeeds before launching. If this is applied to a very short job
succeeds before executing. If this is applied to a very short job
that can predict whether longer jobs will fail early, this can be
used to reduce the number of jobs that Zuul will launch and
used to reduce the number of jobs that Zuul will execute and
ultimately have to cancel. In that case, a small amount of
parallelization of jobs is traded for more efficient use of testing
resources. On the other hand, to apply this to a long running job
@ -709,7 +709,7 @@ each job as it builds a list from the project specification.
a job is voting or not. Default: ``true``.
**attempts (optional)**
Number of attempts zuul will launch a job. Once reached, zuul will report
Number of attempts zuul will execute a job. Once reached, zuul will report
RETRY_LIMIT as the job result.
Defaults to 3.

View File

@ -2,12 +2,12 @@
tasks:
- name: Collect console log.
synchronize:
dest: "{{ zuul.launcher.log_root }}"
dest: "{{ zuul.executor.log_root }}"
mode: pull
src: "/tmp/console.log"
- name: Publish logs.
copy:
dest: "/opt/zuul-logs/{{ zuul.uuid}}"
src: "{{ zuul.launcher.log_root }}/"
src: "{{ zuul.executor.log_root }}/"
delegate_to: 127.0.0.1

View File

@ -18,7 +18,7 @@
- name: Synchronize src repos to workspace directory.
synchronize:
dest: "{{ prepare_workspace_root }}"
src: "{{ zuul.launcher.src_root }}"
src: "{{ zuul.executor.src_root }}"
- name: Run configure_mirror.sh
shell: /opt/nodepool-scripts/configure_mirror.sh

View File

@ -11,7 +11,7 @@
- name: Collect tox logs.
synchronize:
dest: "{{ zuul.launcher.log_root }}/tox"
dest: "{{ zuul.executor.log_root }}/tox"
mode: pull
src: "{{ item.path }}/log/"
with_items: "{{ result.files }}"

View File

@ -25,7 +25,7 @@ console_scripts =
zuul-merger = zuul.cmd.merger:main
zuul = zuul.cmd.client:main
zuul-cloner = zuul.cmd.cloner:main
zuul-launcher = zuul.cmd.launcher:main
zuul-executor = zuul.cmd.executor:main
[build_sphinx]
source-dir = doc/source

View File

@ -57,8 +57,8 @@ import zuul.connection.sql
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
import zuul.launcher.server
import zuul.launcher.client
import zuul.executor.server
import zuul.executor.client
import zuul.lib.connections
import zuul.merger.client
import zuul.merger.merger
@ -570,9 +570,9 @@ class FakeStatsd(threading.Thread):
class FakeBuild(object):
log = logging.getLogger("zuul.test")
def __init__(self, launch_server, job):
def __init__(self, executor_server, job):
self.daemon = True
self.launch_server = launch_server
self.executor_server = executor_server
self.job = job
self.jobdir = None
self.uuid = job.unique
@ -638,7 +638,7 @@ class FakeBuild(object):
def run(self):
self.log.debug('Running build %s' % self.unique)
if self.launch_server.hold_jobs_in_build:
if self.executor_server.hold_jobs_in_build:
self.log.debug('Holding build %s' % self.unique)
self._wait()
self.log.debug("Build %s continuing" % self.unique)
@ -654,7 +654,7 @@ class FakeBuild(object):
return result
def shouldFail(self):
changes = self.launch_server.fail_tests.get(self.name, [])
changes = self.executor_server.fail_tests.get(self.name, [])
for change in changes:
if self.hasChanges(change):
return True
@ -691,21 +691,21 @@ class FakeBuild(object):
return True
class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
"""An Ansible launcher to be used in tests.
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
"""An Ansible executor to be used in tests.
:ivar bool hold_jobs_in_build: If true, when jobs are launched
:ivar bool hold_jobs_in_build: If true, when jobs are executed
they will report that they have started but then pause until
released before reporting completion. This attribute may be
changed at any time and will take effect for subsequently
launched builds, but previously held builds will still need to
executed builds, but previously held builds will still need to
be explicitly released.
"""
def __init__(self, *args, **kw):
self._run_ansible = kw.pop('_run_ansible', False)
self._test_root = kw.pop('_test_root', False)
super(RecordingLaunchServer, self).__init__(*args, **kw)
super(RecordingExecutorServer, self).__init__(*args, **kw)
self.hold_jobs_in_build = False
self.lock = threading.Lock()
self.running_builds = []
@ -714,7 +714,7 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
self.job_builds = {}
def failJob(self, name, change):
"""Instruct the launcher to report matching builds as failures.
"""Instruct the executor to report matching builds as failures.
:arg str name: The name of the job to fail.
:arg Change change: The :py:class:`~tests.base.FakeChange`
@ -748,7 +748,7 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
self.log.debug("Done releasing builds %s (%s)" %
(regex, len(self.running_builds)))
def launchJob(self, job):
def executeJob(self, job):
build = FakeBuild(self, job)
job.build = build
self.running_builds.append(build)
@ -767,32 +767,32 @@ class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
if build.unique == uuid:
build.aborted = True
build.release()
super(RecordingLaunchServer, self).stopJob(job)
super(RecordingExecutorServer, self).stopJob(job)
class RecordingAnsibleJob(zuul.launcher.server.AnsibleJob):
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def runPlaybooks(self, args):
build = self.launcher_server.job_builds[self.job.unique]
build = self.executor_server.job_builds[self.job.unique]
build.jobdir = self.jobdir
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
self.launcher_server.lock.acquire()
self.launcher_server.build_history.append(
self.executor_server.lock.acquire()
self.executor_server.build_history.append(
BuildHistory(name=build.name, result=result, changes=build.changes,
node=build.node, uuid=build.unique,
parameters=build.parameters, jobdir=build.jobdir,
pipeline=build.parameters['ZUUL_PIPELINE'])
)
self.launcher_server.running_builds.remove(build)
del self.launcher_server.job_builds[self.job.unique]
self.launcher_server.lock.release()
self.executor_server.running_builds.remove(build)
del self.executor_server.job_builds[self.job.unique]
self.executor_server.lock.release()
return result
def runAnsible(self, cmd, timeout, trusted=False):
build = self.launcher_server.job_builds[self.job.unique]
build = self.executor_server.job_builds[self.job.unique]
if self.launcher_server._run_ansible:
if self.executor_server._run_ansible:
result = super(RecordingAnsibleJob, self).runAnsible(
cmd, timeout, trusted=trusted)
else:
@ -828,7 +828,7 @@ class FakeGearmanServer(gear.Server):
for queue in [self.high_queue, self.normal_queue, self.low_queue]:
for job in queue:
if not hasattr(job, 'waiting'):
if job.name.startswith('launcher:launch'):
if job.name.startswith('executor:execute'):
job.waiting = self.hold_jobs_in_queue
else:
job.waiting = False
@ -855,7 +855,7 @@ class FakeGearmanServer(gear.Server):
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
if job.name != 'launcher:launch':
if job.name != 'executor:execute':
continue
parameters = json.loads(job.arguments)
if not regex or re.match(regex, parameters.get('job')):
@ -991,7 +991,7 @@ class FakeNodepool(object):
created_time=now,
updated_time=now,
image_id=None,
launcher='fake-nodepool')
executor='fake-nodepool')
data = json.dumps(data)
path = self.client.create(path, data,
makepath=True,
@ -1223,13 +1223,13 @@ class ZuulTestCase(BaseTestCase):
server that all of the Zuul components in this test use to
communicate with each other.
:ivar RecordingLaunchServer launch_server: An instance of
:py:class:`~tests.base.RecordingLaunchServer` which is the
Ansible launch server used to run jobs for this test.
:ivar RecordingExecutorServer executor_server: An instance of
:py:class:`~tests.base.RecordingExecutorServer` which is the
Ansible execute server used to run jobs for this test.
:ivar list builds: A list of :py:class:`~tests.base.FakeBuild` objects
representing currently running builds. They are appended to
the list in the order they are launched, and removed from this
the list in the order they are executed, and removed from this
list upon completion.
:ivar list history: A list of :py:class:`~tests.base.BuildHistory`
@ -1261,7 +1261,7 @@ class ZuulTestCase(BaseTestCase):
self.test_root = os.path.join(tmp_root, "zuul-test")
self.upstream_root = os.path.join(self.test_root, "upstream")
self.merger_src_root = os.path.join(self.test_root, "merger-git")
self.launcher_src_root = os.path.join(self.test_root, "launcher-git")
self.executor_src_root = os.path.join(self.test_root, "executor-git")
self.state_root = os.path.join(self.test_root, "lib")
if os.path.exists(self.test_root):
@ -1276,7 +1276,7 @@ class ZuulTestCase(BaseTestCase):
os.path.join(FIXTURE_DIR,
self.config.get('zuul', 'tenant_config')))
self.config.set('merger', 'git_dir', self.merger_src_root)
self.config.set('launcher', 'git_dir', self.launcher_src_root)
self.config.set('executor', 'git_dir', self.executor_src_root)
self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
@ -1337,17 +1337,17 @@ class ZuulTestCase(BaseTestCase):
self._startMerger()
self.launch_server = RecordingLaunchServer(
self.executor_server = RecordingExecutorServer(
self.config, self.connections,
jobdir_root=self.test_root,
_run_ansible=self.run_ansible,
_test_root=self.test_root,
keep_jobdir=KEEP_TEMPDIRS)
self.launch_server.start()
self.history = self.launch_server.build_history
self.builds = self.launch_server.running_builds
self.executor_server.start()
self.history = self.executor_server.build_history
self.builds = self.executor_server.running_builds
self.launch_client = zuul.launcher.client.LaunchClient(
self.executor_client = zuul.executor.client.ExecutorClient(
self.config, self.sched)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
@ -1360,7 +1360,7 @@ class ZuulTestCase(BaseTestCase):
self.zk_chroot_fixture.zookeeper_port,
self.zk_chroot_fixture.zookeeper_chroot)
self.sched.setLauncher(self.launch_client)
self.sched.setExecutor(self.executor_client)
self.sched.setMerger(self.merge_client)
self.sched.setNodepool(self.nodepool)
self.sched.setZooKeeper(self.zk)
@ -1372,7 +1372,7 @@ class ZuulTestCase(BaseTestCase):
self.sched.start()
self.webapp.start()
self.rpc.start()
self.launch_client.gearman.waitForServer()
self.executor_client.gearman.waitForServer()
self.addCleanup(self.shutdown)
self.sched.reconfigure(self.config)
@ -1488,11 +1488,11 @@ class ZuulTestCase(BaseTestCase):
def shutdown(self):
self.log.debug("Shutting down after tests")
self.launch_client.stop()
self.executor_client.stop()
self.merge_server.stop()
self.merge_server.join()
self.merge_client.stop()
self.launch_server.stop()
self.executor_server.stop()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@ -1579,29 +1579,29 @@ class ZuulTestCase(BaseTestCase):
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
if self.launch_client.meta_jobs:
if self.executor_client.meta_jobs:
return False
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
for build in self.history:
zbuild = self.launch_client.builds.get(build.uuid)
zbuild = self.executor_client.builds.get(build.uuid)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
for connection in self.launch_server.worker.active_connections:
for connection in self.executor_server.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
def areAllBuildsWaiting(self):
builds = self.launch_client.builds.values()
builds = self.executor_client.builds.values()
for build in builds:
client_job = None
for conn in self.launch_client.gearman.active_connections:
for conn in self.executor_client.gearman.active_connections:
for j in conn.related_jobs.values():
if j.unique == build.uuid:
client_job = j
@ -1626,7 +1626,8 @@ class ZuulTestCase(BaseTestCase):
if build.url is None:
self.log.debug("%s has not reported start" % build)
return False
worker_build = self.launch_server.job_builds.get(server_job.unique)
worker_build = self.executor_server.job_builds.get(
server_job.unique)
if worker_build:
if worker_build.isWaiting():
continue
@ -1673,7 +1674,7 @@ class ZuulTestCase(BaseTestCase):
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
self.launch_server.lock.acquire()
self.executor_server.lock.acquire()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
@ -1691,11 +1692,11 @@ class ZuulTestCase(BaseTestCase):
# components were stable, we don't erroneously
# report that we are settled.
self.sched.run_handler_lock.release()
self.launch_server.lock.release()
self.executor_server.lock.release()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
self.launch_server.lock.release()
self.executor_server.lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
@ -1912,7 +1913,7 @@ class ZuulTestCase(BaseTestCase):
class AnsibleZuulTestCase(ZuulTestCase):
"""ZuulTestCase but with an actual ansible launcher running"""
"""ZuulTestCase but with an actual ansible executor running"""
run_ansible = True

View File

@ -12,8 +12,8 @@ git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[launcher]
git_dir=/tmp/zuul-test/launcher-git
[executor]
git_dir=/tmp/zuul-test/executor-git
[connection review_gerrit]
driver=gerrit

View File

@ -12,8 +12,8 @@ git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[launcher]
git_dir=/tmp/zuul-test/launcher-git
[executor]
git_dir=/tmp/zuul-test/executor-git
[connection review_gerrit]
driver=gerrit

View File

@ -12,8 +12,8 @@ git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[launcher]
git_dir=/tmp/zuul-test/launcher-git
[executor]
git_dir=/tmp/zuul-test/executor-git
[swift]
authurl=https://identity.api.example.org/v2.0/

View File

@ -12,8 +12,8 @@ git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[launcher]
git_dir=/tmp/zuul-test/launcher-git
[executor]
git_dir=/tmp/zuul-test/executor-git
[swift]
authurl=https://identity.api.example.org/v2.0/

View File

@ -610,7 +610,7 @@ class TestCloner(ZuulTestCase):
# Start a periodic job
self.worker.hold_jobs_in_build = True
self.launcher.negative_function_cache_ttl = 0
self.executor.negative_function_cache_ttl = 0
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-timer.yaml')
self.sched.reconfigure(self.config)

View File

@ -46,7 +46,7 @@ class TestConnections(ZuulTestCase):
'jenkins')
B = self.fake_review_gerrit.addFakeChange('org/project', 'master', 'B')
self.launch_server.failJob('project-test2', B)
self.executor_server.failJob('project-test2', B)
self.addEvent('review_gerrit', B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -239,7 +239,7 @@ class TestMultipleGerrits(ZuulTestCase):
tenant_config_file = 'config/zuul-connections-multiple-gerrits/main.yaml'
def test_multiple_project_separate_gerrits(self):
self.launch_server.hold_jobs_in_build = True
self.executor_server.hold_jobs_in_build = True
A = self.fake_another_gerrit.addFakeChange(
'org/project1', 'master', 'A')
@ -276,6 +276,6 @@ class TestMultipleGerrits(ZuulTestCase):
pipeline='review_check'),
])
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()

View File

@ -58,7 +58,7 @@ class TestOpenStack(AnsibleZuulTestCase):
'ubuntu-trusty')
def test_dsvm_keystone_repo(self):
self.launch_server.keep_jobdir = True
self.executor_server.keep_jobdir = True
A = self.fake_gerrit.addFakeChange('openstack/nova', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -68,9 +68,9 @@ class TestOpenStack(AnsibleZuulTestCase):
build = self.getJobFromHistory('dsvm')
# Check that a change to nova triggered a keystone clone
launcher_git_dir = os.path.join(self.launcher_src_root,
executor_git_dir = os.path.join(self.executor_src_root,
'openstack', 'keystone', '.git')
self.assertTrue(os.path.exists(launcher_git_dir),
self.assertTrue(os.path.exists(executor_git_dir),
msg='openstack/keystone should be cloned.')
jobdir_git_dir = os.path.join(build.jobdir.src_root,
@ -79,7 +79,7 @@ class TestOpenStack(AnsibleZuulTestCase):
msg='openstack/keystone should be cloned.')
def test_dsvm_nova_repo(self):
self.launch_server.keep_jobdir = True
self.executor_server.keep_jobdir = True
A = self.fake_gerrit.addFakeChange('openstack/keystone', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -89,9 +89,9 @@ class TestOpenStack(AnsibleZuulTestCase):
build = self.getJobFromHistory('dsvm')
# Check that a change to keystone triggered a nova clone
launcher_git_dir = os.path.join(self.launcher_src_root,
executor_git_dir = os.path.join(self.executor_src_root,
'openstack', 'nova', '.git')
self.assertTrue(os.path.exists(launcher_git_dir),
self.assertTrue(os.path.exists(executor_git_dir),
msg='openstack/nova should be cloned.')
jobdir_git_dir = os.path.join(build.jobdir.src_root,

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ class TestWebapp(ZuulTestCase):
def setUp(self):
super(TestWebapp, self).setUp()
self.launch_server.hold_jobs_in_build = True
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('code-review', 2)
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
@ -38,8 +38,8 @@ class TestWebapp(ZuulTestCase):
self.port = self.webapp.server.socket.getsockname()[1]
def tearDown(self):
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
super(TestWebapp, self).tearDown()

View File

@ -27,7 +27,7 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
# When A is enqueued in the gate, B1 and B2 should both attempt
# to be enqueued in both pipelines. B1 should end up in check
# and B2 in gate because of differing pipeline requirements.
self.launch_server.hold_jobs_in_build = True
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B1 = self.fake_gerrit.addFakeChange('org/project', 'master', 'B1')
B2 = self.fake_gerrit.addFakeChange('org/project', 'master', 'B2')
@ -46,8 +46,8 @@ class TestZuulTriggerParentChangeEnqueued(ZuulTestCase):
# to enqueue behind 1,1 so that the test is more
# deterministic.
self.waitUntilSettled()
self.launch_server.hold_jobs_in_build = False
self.launch_server.release()
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(len(self.history), 3)

View File

@ -239,8 +239,8 @@ class Client(zuul.cmd.ZuulApp):
'uuid': {
'title': 'UUID'
},
'launch_time': {
'title': 'Launch Time',
'execute_time': {
'title': 'Execute Time',
'transform': self._epoch_to_relative_time,
'append': ' ago'
},

View File

@ -29,7 +29,7 @@ import sys
import signal
import zuul.cmd
import zuul.launcher.server
import zuul.executor.server
# No zuul imports that pull in paramiko here; it must not be
# imported until after the daemonization.
@ -37,10 +37,10 @@ import zuul.launcher.server
# Similar situation with gear and statsd.
class Launcher(zuul.cmd.ZuulApp):
class Executor(zuul.cmd.ZuulApp):
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Zuul launch worker.')
parser = argparse.ArgumentParser(description='Zuul executor.')
parser.add_argument('-c', dest='config',
help='specify the config file')
parser.add_argument('-d', dest='nodaemon', action='store_true',
@ -52,7 +52,7 @@ class Launcher(zuul.cmd.ZuulApp):
action='store_true',
help='keep local jobdirs after run completes')
parser.add_argument('command',
choices=zuul.launcher.server.COMMANDS,
choices=zuul.executor.server.COMMANDS,
nargs='?')
self.args = parser.parse_args()
@ -63,55 +63,55 @@ class Launcher(zuul.cmd.ZuulApp):
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
path = os.path.join(state_dir, 'executor.socket')
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(path)
s.sendall('%s\n' % cmd)
def exit_handler(self):
self.launcher.stop()
self.launcher.join()
self.executor.stop()
self.executor.join()
def main(self, daemon=True):
# See comment at top of file about zuul imports
self.setup_logging('launcher', 'log_config')
self.setup_logging('executor', 'log_config')
self.log = logging.getLogger("zuul.Launcher")
self.log = logging.getLogger("zuul.Executor")
LaunchServer = zuul.launcher.server.LaunchServer
self.launcher = LaunchServer(self.config, self.connections,
LaunchServer = zuul.executor.server.LaunchServer
self.executor = LaunchServer(self.config, self.connections,
keep_jobdir=self.args.keep_jobdir)
self.launcher.start()
self.executor.start()
signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
if daemon:
self.launcher.join()
self.executor.join()
else:
while True:
try:
signal.pause()
except KeyboardInterrupt:
print("Ctrl + C: asking launcher to exit nicely...\n")
print("Ctrl + C: asking executor to exit nicely...\n")
self.exit_handler()
sys.exit(0)
def main():
server = Launcher()
server = Executor()
server.parse_arguments()
server.read_config()
if server.args.command in zuul.launcher.server.COMMANDS:
if server.args.command in zuul.executor.server.COMMANDS:
server.send_command(server.args.command)
sys.exit(0)
server.configure_connections()
if server.config.has_option('launcher', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('launcher', 'pidfile'))
if server.config.has_option('executor', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('executor', 'pidfile'))
else:
pid_fn = '/var/run/zuul-launcher/zuul-launcher.pid'
pid_fn = '/var/run/zuul-executor/zuul-executor.pid'
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:

View File

@ -78,7 +78,7 @@ class Scheduler(zuul.cmd.ZuulApp):
def test_config(self):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.client
import zuul.executor.client
logging.basicConfig(level=logging.DEBUG)
try:
@ -124,7 +124,7 @@ class Scheduler(zuul.cmd.ZuulApp):
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
import zuul.launcher.client
import zuul.executor.client
import zuul.merger.client
import zuul.nodepool
import zuul.webapp
@ -141,7 +141,7 @@ class Scheduler(zuul.cmd.ZuulApp):
self.sched = zuul.scheduler.Scheduler(self.config)
gearman = zuul.launcher.client.LaunchClient(self.config, self.sched)
gearman = zuul.executor.client.LaunchClient(self.config, self.sched)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
@ -174,7 +174,7 @@ class Scheduler(zuul.cmd.ZuulApp):
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.configure_connections()
self.sched.setLauncher(gearman)
self.sched.setExecutor(gearman)
self.sched.setMerger(merger)
self.sched.setNodepool(nodepool)
self.sched.setZooKeeper(zookeeper)

View File

@ -782,7 +782,7 @@ class TenantParser(object):
for job in jobs:
# Note: this is an ordered list -- we wait for cat jobs to
# complete in the order they were launched which is the
# complete in the order they were executed which is the
# same order they were defined in the main config file.
# This is important for correct inheritance.
TenantParser.log.debug("Waiting for cat job %s" % (job,))

View File

@ -102,7 +102,7 @@ def getJobData(job):
class ZuulGearmanClient(gear.Client):
def __init__(self, zuul_gearman):
super(ZuulGearmanClient, self).__init__('Zuul Launch Client')
super(ZuulGearmanClient, self).__init__('Zuul Executor Client')
self.__zuul_gearman = zuul_gearman
def handleWorkComplete(self, packet):
@ -144,8 +144,8 @@ class ZuulGearmanClient(gear.Client):
self.__zuul_gearman.onUnknownJob(job)
class LaunchClient(object):
log = logging.getLogger("zuul.LaunchClient")
class ExecutorClient(object):
log = logging.getLogger("zuul.ExecutorClient")
negative_function_cache_ttl = 5
def __init__(self, config, sched):
@ -209,10 +209,10 @@ class LaunchClient(object):
self.log.debug("Function %s is not registered" % name)
return False
def launch(self, job, item, pipeline, dependent_items=[]):
def execute(self, job, item, pipeline, dependent_items=[]):
uuid = str(uuid4().hex)
self.log.info(
"Launch job %s (uuid: %s) on nodes %s for change %s "
"Execute job %s (uuid: %s) on nodes %s for change %s "
"with dependent changes %s" % (
job, uuid,
item.current_build_set.getJobNodeSet(job.name),
@ -339,7 +339,7 @@ class LaunchClient(object):
self.sched.onBuildCompleted(build, 'SUCCESS')
return build
gearman_job = gear.Job('launcher:launch', json.dumps(params),
gearman_job = gear.Job('executor:execute', json.dumps(params),
unique=uuid)
build.__gearman_job = gearman_job
build.__gearman_manager = None
@ -433,7 +433,7 @@ class LaunchClient(object):
# internal dict after it's added to the report queue.
del self.builds[job.unique]
else:
if not job.name.startswith("launcher:stop:"):
if not job.name.startswith("executor:stop:"):
self.log.error("Unable to find build %s" % job.unique)
def onWorkStatus(self, job):
@ -483,7 +483,7 @@ class LaunchClient(object):
(build,))
stop_uuid = str(uuid4().hex)
data = dict(uuid=build.__gearman_job.unique)
stop_job = gear.Job("launcher:stop:%s" % build.__gearman_manager,
stop_job = gear.Job("executor:stop:%s" % build.__gearman_manager,
json.dumps(data), unique=stop_uuid)
self.meta_jobs[stop_uuid] = stop_job
self.log.debug("Submitting stop job: %s", stop_job)

View File

@ -211,15 +211,15 @@ class DeduplicateQueue(object):
self.condition.release()
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
class ExecutorServer(object):
log = logging.getLogger("zuul.ExecutorServer")
def __init__(self, config, connections={}, jobdir_root=None,
keep_jobdir=False):
self.config = config
self.keep_jobdir = keep_jobdir
self.jobdir_root = jobdir_root
# TODOv3(mordred): make the launcher name more unique --
# TODOv3(mordred): make the executor name more unique --
# perhaps hostname+pid.
self.hostname = socket.gethostname()
self.zuul_url = config.get('merger', 'zuul_url')
@ -232,10 +232,10 @@ class LaunchServer(object):
unverbose=self.verboseOff,
)
if self.config.has_option('launcher', 'git_dir'):
self.merge_root = self.config.get('launcher', 'git_dir')
if self.config.has_option('executor', 'git_dir'):
self.merge_root = self.config.get('executor', 'git_dir')
else:
self.merge_root = '/var/lib/zuul/launcher-git'
self.merge_root = '/var/lib/zuul/executor-git'
if self.config.has_option('merger', 'git_user_email'):
self.merge_email = self.config.get('merger', 'git_user_email')
@ -260,7 +260,7 @@ class LaunchServer(object):
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
path = os.path.join(state_dir, 'executor.socket')
self.command_socket = commandsocket.CommandSocket(path)
ansible_dir = os.path.join(state_dir, 'ansible')
self.library_dir = os.path.join(ansible_dir, 'library')
@ -303,7 +303,7 @@ class LaunchServer(object):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker('Zuul Launch Server')
self.worker = gear.Worker('Zuul Executor Server')
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
@ -325,8 +325,8 @@ class LaunchServer(object):
self.thread.start()
def register(self):
self.worker.registerFunction("launcher:launch")
self.worker.registerFunction("launcher:stop:%s" % self.hostname)
self.worker.registerFunction("executor:execute")
self.worker.registerFunction("executor:stop:%s" % self.hostname)
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:cat")
@ -398,15 +398,15 @@ class LaunchServer(object):
return task
def run(self):
self.log.debug("Starting launch listener")
self.log.debug("Starting executor listener")
while self._running:
try:
job = self.worker.getJob()
try:
if job.name == 'launcher:launch':
self.log.debug("Got launch job: %s" % job.unique)
self.launchJob(job)
elif job.name.startswith('launcher:stop'):
if job.name == 'executor:execute':
self.log.debug("Got execute job: %s" % job.unique)
self.executeJob(job)
elif job.name.startswith('executor:stop'):
self.log.debug("Got stop job: %s" % job.unique)
self.stopJob(job)
elif job.name == 'merger:cat':
@ -426,7 +426,7 @@ class LaunchServer(object):
except Exception:
self.log.exception("Exception while getting job")
def launchJob(self, job):
def executeJob(self, job):
self.job_workers[job.unique] = AnsibleJob(self, job)
self.job_workers[job.unique].run()
@ -481,8 +481,8 @@ class AnsibleJob(object):
RESULT_UNREACHABLE = 3
RESULT_ABORTED = 4
def __init__(self, launcher_server, job):
self.launcher_server = launcher_server
def __init__(self, executor_server, job):
self.executor_server = executor_server
self.job = job
self.jobdir = None
self.proc = None
@ -490,16 +490,16 @@ class AnsibleJob(object):
self.running = False
self.aborted = False
if self.launcher_server.config.has_option(
'launcher', 'private_key_file'):
self.private_key_file = self.launcher_server.config.get(
'launcher', 'private_key_file')
if self.executor_server.config.has_option(
'executor', 'private_key_file'):
self.private_key_file = self.executor_server.config.get(
'executor', 'private_key_file')
else:
self.private_key_file = '~/.ssh/id_rsa'
def run(self):
self.running = True
self.thread = threading.Thread(target=self.launch)
self.thread = threading.Thread(target=self.execute)
self.thread.start()
def stop(self):
@ -507,13 +507,13 @@ class AnsibleJob(object):
self.abortRunningProc()
self.thread.join()
def launch(self):
def execute(self):
try:
self.jobdir = JobDir(root=self.launcher_server.jobdir_root,
keep=self.launcher_server.keep_jobdir)
self._launch()
self.jobdir = JobDir(root=self.executor_server.jobdir_root,
keep=self.executor_server.keep_jobdir)
self._execute()
except Exception:
self.log.exception("Exception while launching job")
self.log.exception("Exception while executing job")
self.job.sendWorkException(traceback.format_exc())
finally:
self.running = False
@ -522,11 +522,11 @@ class AnsibleJob(object):
except Exception:
self.log.exception("Error cleaning up jobdir:")
try:
self.launcher_server.finishJob(self.job.unique)
self.executor_server.finishJob(self.job.unique)
except Exception:
self.log.exception("Error finalizing job thread:")
def _launch(self):
def _execute(self):
self.log.debug("Job %s: beginning" % (self.job.unique,))
self.log.debug("Job %s: args: %s" % (self.job.unique,
self.job.arguments,))
@ -537,7 +537,7 @@ class AnsibleJob(object):
for project in args['projects']:
self.log.debug("Job %s: updating project %s" %
(self.job.unique, project['name']))
tasks.append(self.launcher_server.update(
tasks.append(self.executor_server.update(
project['name'], project['url']))
for task in tasks:
task.wait()
@ -546,14 +546,14 @@ class AnsibleJob(object):
for project in args['projects']:
self.log.debug("Cloning %s" % (project['name'],))
repo = git.Repo.clone_from(
os.path.join(self.launcher_server.merge_root,
os.path.join(self.executor_server.merge_root,
project['name']),
os.path.join(self.jobdir.src_root,
project['name']))
repo.remotes.origin.config_writer.set('url', project['url'])
# Get a merger in order to update the repos involved in this job.
merger = self.launcher_server._getMerger(self.jobdir.src_root)
merger = self.executor_server._getMerger(self.jobdir.src_root)
merge_items = [i for i in args['items'] if i.get('refspec')]
if merge_items:
commit = merger.mergeChanges(merge_items) # noqa
@ -569,14 +569,14 @@ class AnsibleJob(object):
self.prepareAnsibleFiles(args)
data = {
'manager': self.launcher_server.hostname,
'manager': self.executor_server.hostname,
'url': 'https://server/job/{}/0/'.format(args['job']),
'worker_name': 'My Worker',
}
# TODOv3:
# 'name': self.name,
# 'manager': self.launch_server.hostname,
# 'manager': self.executor_server.hostname,
# 'worker_name': 'My Worker',
# 'worker_hostname': 'localhost',
# 'worker_ips': ['127.0.0.1', '192.168.1.1'],
@ -696,7 +696,7 @@ class AnsibleJob(object):
# Check out the playbook repo if needed and set the path to
# the playbook that should be run.
jobdir_playbook.trusted = playbook['trusted']
source = self.launcher_server.connections.getSource(
source = self.executor_server.connections.getSource(
playbook['connection'])
project = source.getProject(playbook['project'])
# TODO(jeblair): construct the url in the merger itself
@ -721,7 +721,7 @@ class AnsibleJob(object):
# the stack of changes we are testing, so check out the branch
# tip into a dedicated space.
merger = self.launcher_server._getMerger(jobdir_playbook.root)
merger = self.executor_server._getMerger(jobdir_playbook.root)
merger.checkoutBranch(project.name, url, playbook['branch'])
path = os.path.join(jobdir_playbook.root,
@ -762,7 +762,7 @@ class AnsibleJob(object):
def prepareZuulRole(self, args, role, root):
self.log.debug("Prepare zuul role for %s" % (role,))
# Check out the role repo if needed
source = self.launcher_server.connections.getSource(
source = self.executor_server.connections.getSource(
role['connection'])
project = source.getProject(role['project'])
# TODO(jeblair): construct the url in the merger itself
@ -791,7 +791,7 @@ class AnsibleJob(object):
# tip into a dedicated space.
if not role_repo:
merger = self.launcher_server._getMerger(root)
merger = self.executor_server._getMerger(root)
merger.checkoutBranch(project.name, url, 'master')
role_repo = os.path.join(root, project.name)
@ -816,7 +816,7 @@ class AnsibleJob(object):
with open(self.jobdir.vars, 'w') as vars_yaml:
zuul_vars = dict(args['vars'])
zuul_vars['zuul']['launcher'] = dict(src_root=self.jobdir.src_root,
zuul_vars['zuul']['executor'] = dict(src_root=self.jobdir.src_root,
log_root=self.jobdir.log_root)
vars_yaml.write(
yaml.safe_dump(zuul_vars, default_flow_style=False))
@ -836,19 +836,19 @@ class AnsibleJob(object):
config.write('log_path = %s\n' % self.jobdir.ansible_log)
config.write('gathering = explicit\n')
config.write('library = %s\n'
% self.launcher_server.library_dir)
% self.executor_server.library_dir)
if self.jobdir.roles_path:
config.write('roles_path = %s\n' %
':'.join(self.jobdir.roles_path))
config.write('callback_plugins = %s\n'
% self.launcher_server.callback_dir)
% self.executor_server.callback_dir)
config.write('stdout_callback = zuul_stream\n')
# bump the timeout because busy nodes may take more than
# 10s to respond
config.write('timeout = 30\n')
if not trusted:
config.write('action_plugins = %s\n'
% self.launcher_server.action_dir)
% self.executor_server.action_dir)
# On trusted jobs, we want to prevent the printing of args,
# since trusted jobs might have access to secrets that they may

View File

@ -365,33 +365,33 @@ class PipelineManager(object):
build_set.setJobNodeRequest(job.name, req)
return True
def _launchJobs(self, item, jobs):
self.log.debug("Launching jobs for change %s" % item.change)
def _executeJobs(self, item, jobs):
self.log.debug("Executing jobs for change %s" % item.change)
dependent_items = self.getDependentItems(item)
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
try:
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.sched.nodepool.useNodeSet(nodeset)
build = self.sched.launcher.launch(job, item,
self.pipeline,
dependent_items)
build = self.sched.executor.execute(job, item,
self.pipeline,
dependent_items)
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
item.addBuild(build)
except:
self.log.exception("Exception while launching job %s "
self.log.exception("Exception while executing job %s "
"for change %s:" % (job, item.change))
def launchJobs(self, item):
def executeJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
# was launched. Appears to be a longstanding bug.
# was executed. Appears to be a longstanding bug.
if not item.current_build_set.layout:
return False
jobs = item.findJobsToRun(self.sched.mutex)
if jobs:
self._launchJobs(item, jobs)
self._executeJobs(item, jobs)
def cancelJobs(self, item, prime=True):
self.log.debug("Cancel jobs for change %s" % item.change)
@ -409,7 +409,7 @@ class PipelineManager(object):
continue
was_running = False
try:
was_running = self.sched.launcher.cancel(build)
was_running = self.sched.executor.cancel(build)
except:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
@ -594,7 +594,7 @@ class PipelineManager(object):
failing_reasons.append("it has an invalid configuration")
if ready and self.provisionNodes(item):
changed = True
if actionable and ready and self.launchJobs(item):
if actionable and ready and self.executeJobs(item):
changed = True
if item.didAnyJobFail():
failing_reasons.append("at least one job failed")

View File

@ -576,7 +576,7 @@ class PlaybookContext(object):
self.path == other.path)
def toDict(self):
# Render to a dict to use in passing json to the launcher
# Render to a dict to use in passing json to the executor
return dict(
connection=self.source_context.project.connection_name,
project=self.source_context.project.name,
@ -607,7 +607,7 @@ class Role(object):
@abc.abstractmethod
def toDict(self):
# Render to a dict to use in passing json to the launcher
# Render to a dict to use in passing json to the executor
return dict(target_name=self.target_name)
@ -632,7 +632,7 @@ class ZuulRole(Role):
self.trusted == other.trusted)
def toDict(self):
# Render to a dict to use in passing json to the launcher
# Render to a dict to use in passing json to the executor
d = super(ZuulRole, self).toDict()
d['type'] = 'zuul'
d['connection'] = self.connection_name
@ -918,7 +918,7 @@ class Build(object):
self.url = None
self.result = None
self.build_set = None
self.launch_time = time.time()
self.execute_time = time.time()
self.start_time = None
self.end_time = None
self.estimated_time = None
@ -994,9 +994,9 @@ class BuildSet(object):
"""A collection of Builds for one specific potential future repository
state.
When Zuul launches Builds for a change, it creates a Build to
When Zuul executes Builds for a change, it creates a Build to
represent each execution of each job and a BuildSet to keep track
of all the Builds running for that Change. When Zuul re-launches
of all the Builds running for that Change. When Zuul re-executes
Builds for a Change with a different configuration, all of the
running Builds in the BuildSet for that change are aborted, and a
new BuildSet is created to hold the Builds for the Jobs being
@ -1459,7 +1459,7 @@ class QueueItem(object):
'result': result,
'voting': job.voting,
'uuid': build.uuid if build else None,
'launch_time': build.launch_time if build else None,
'execute_time': build.execute_time if build else None,
'start_time': build.start_time if build else None,
'end_time': build.end_time if build else None,
'estimated_time': build.estimated_time if build else None,

View File

@ -236,7 +236,7 @@ class Scheduler(threading.Thread):
The Scheduler is reponsible for recieving events and dispatching
them to appropriate components (including pipeline managers,
mergers and launchers).
mergers and executors).
It runs a single threaded main loop which processes events
received one at a time and takes action as appropriate. Other
@ -264,7 +264,7 @@ class Scheduler(threading.Thread):
self._pause = False
self._exit = False
self._stopped = False
self.launcher = None
self.executor = None
self.merger = None
self.connections = None
self.statsd = extras.try_import('statsd.statsd')
@ -304,8 +304,8 @@ class Scheduler(threading.Thread):
def stopConnections(self):
self.connections.stop()
def setLauncher(self, launcher):
self.launcher = launcher
def setExecutor(self, executor):
self.executor = executor
def setMerger(self, merger):
self.merger = merger
@ -355,7 +355,7 @@ class Scheduler(threading.Thread):
# interesting.
if label == build.node_name:
continue
dt = int((build.start_time - build.launch_time) * 1000)
dt = int((build.start_time - build.execute_time) * 1000)
key = 'zuul.pipeline.%s.label.%s.wait_time' % (
build.pipeline.name, label)
self.statsd.timing(key, dt)
@ -368,7 +368,7 @@ class Scheduler(threading.Thread):
key = 'zuul.pipeline.%s.job.%s.wait_time' % (
build.pipeline.name, jobname)
dt = int((build.start_time - build.launch_time) * 1000)
dt = int((build.start_time - build.execute_time) * 1000)
self.statsd.timing(key, dt)
except:
self.log.exception("Exception reporting runtime stats")
@ -572,7 +572,7 @@ class Scheduler(threading.Thread):
self.log.warning(
"Canceling build %s during reconfiguration" % (build,))
try:
self.launcher.cancel(build)
self.executor.cancel(build)
except Exception:
self.log.exception(
"Exception while canceling build %s "
@ -695,7 +695,7 @@ class Scheduler(threading.Thread):
self.process_management_queue()
# Give result events priority -- they let us stop builds,
# whereas trigger events cause us to launch builds.
# whereas trigger events cause us to execute builds.
while not self.result_event_queue.empty():
self.process_result_queue()