Improve resource usage with semaphores

Currently when jobs use semaphores they first get and lock the build
nodes and then aquire the semaphore. If there are many jobs waiting
for the semaphore this can block a substantial part of the available
resources. In order to make this safe default to acquire the semaphore
before requesting the nodes.

However in some cases when jobs with a semaphore shall run as fast as
possible in a consecutive manner then it might be preferrable to
accept some waste of resources. In order to support this use case the
job using a semaphore can override this behavior and still acquire the
semaphore after getting the nodes.

Change-Id: Id6f582ec29219d280d05319d1b822c7934437b7a
This commit is contained in:
Tobias Henkel 2018-11-20 15:14:16 +01:00
parent 33699fa316
commit ae887dab58
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
9 changed files with 187 additions and 12 deletions

View File

@ -629,7 +629,25 @@ Here is an example of two job definitions:
The name of a :ref:`semaphore` which should be acquired and
released when the job begins and ends. If the semaphore is at
maximum capacity, then Zuul will wait until it can be acquired
before starting the job.
before starting the job. The format is either a string or a
dictionary. If it's a string it references a semaphore using the
default value for :attr:`job.semaphore.resources-first`.
.. attr:: name
:required:
The name of the referenced semaphore
.. attr:: resources-first
:default: False
By default a semaphore is acquired before the resources are
requested. However in some cases the user wants to run cheap
jobs as quickly as possible in a consecutive manner. In this
case :attr:`job.semaphore.resources-first` can be enabled to
request the resources before locking the semaphore. This can
lead to some amount of blocked resources while waiting for the
semaphore so this should be used with caution.
.. attr:: tags

View File

@ -0,0 +1,14 @@
---
features:
- |
A job using a semaphore now can configure if it should acquire the it
before requesting resources or just before running.
upgrade:
- |
The acquiring behavior of jobs with semaphores has been changed. Up to now
a job requested resources and aquired the semaphore just before it started
to run. However this could lead to a high amount of resource waste. Instead
jobs now acquire the semaphore before requesting the resources by default.
This behavior can be overridden by jobs using
:attr:`job.semaphore.resources-first` if some waste of resources is
acceptable.

View File

@ -22,6 +22,10 @@
- job:
name: base
parent: null
nodeset:
nodes:
- name: controller
label: label1
- job:
name: project-test1
@ -56,6 +60,20 @@
- name: controller
label: label1
- job:
name: semaphore-one-test1-resources-first
semaphore:
name: test-semaphore
resources-first: True
run: playbooks/semaphore-one-test1.yaml
- job:
name: semaphore-one-test2-resources-first
semaphore:
name: test-semaphore
resources-first: True
run: playbooks/semaphore-one-test1.yaml
- project:
name: org/project
check:
@ -77,3 +95,11 @@
check:
jobs:
- semaphore-one-test3
- project:
name: org/project3
check:
jobs:
- project-test1
- semaphore-one-test1-resources-first
- semaphore-one-test2-resources-first

View File

@ -0,0 +1 @@
test

View File

@ -8,3 +8,4 @@
- org/project
- org/project1
- org/project2
- org/project3

View File

@ -5849,6 +5849,10 @@ class TestSemaphore(ZuulTestCase):
self.executor_server.hold_jobs_in_build = True
# Pause nodepool so we can check the ordering of getting the nodes
# and aquiring the semaphore.
self.fake_nodepool.paused = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.assertFalse('test-semaphore' in
@ -5858,6 +5862,13 @@ class TestSemaphore(ZuulTestCase):
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# By default we first lock the semaphore and then get the nodes
# so at this point the semaphore needs to be aquired.
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_nodepool.paused = False
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'semaphore-one-test1')
@ -5993,6 +6004,53 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
def test_semaphore_resources_first(self):
"Test semaphores with max=1 (mutex) and get resources first"
tenant = self.sched.abide.tenants.get('tenant-one')
self.executor_server.hold_jobs_in_build = True
# Pause nodepool so we can check the ordering of getting the nodes
# and aquiring the semaphore.
self.fake_nodepool.paused = True
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Here we first get the resources and then lock the semaphore
# so at this point the semaphore should not be aquired.
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_nodepool.paused = False
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name,
'semaphore-one-test1-resources-first')
self.assertEqual(self.builds[2].name, 'project-test1')
self.executor_server.release('semaphore-one-test1')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name,
'semaphore-one-test2-resources-first')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
def test_semaphore_zk_error(self):
"Test semaphore release with zk error"
tenant = self.sched.abide.tenants.get('tenant-one')

View File

@ -509,6 +509,9 @@ class JobParser(object):
secret = {vs.Required('name'): str,
vs.Required('secret'): str}
semaphore = {vs.Required('name'): str,
'resources-first': bool}
# Attributes of a job that can also be used in Project and ProjectTemplate
job_attributes = {'parent': vs.Any(str, None),
'final': bool,
@ -520,7 +523,7 @@ class JobParser(object):
'success-url': str,
'hold-following-changes': bool,
'voting': bool,
'semaphore': str,
'semaphore': vs.Any(semaphore, str),
'tags': to_list(str),
'branches': to_list(str),
'files': to_list(str),
@ -565,7 +568,6 @@ class JobParser(object):
'workspace',
'voting',
'hold-following-changes',
'semaphore',
'attempts',
'failure-message',
'success-message',
@ -720,6 +722,15 @@ class JobParser(object):
new_projects[project.canonical_name] = job_project
job.required_projects = new_projects
if 'semaphore' in conf:
semaphore = conf.get('semaphore')
if isinstance(semaphore, str):
job.semaphore = model.JobSemaphore(semaphore)
else:
job.semaphore = model.JobSemaphore(
semaphore.get('name'),
semaphore.get('resources-first', False))
tags = conf.get('tags')
if tags:
job.tags = set(tags)

View File

@ -322,7 +322,7 @@ class PipelineManager(object):
change.commit_needs_changes = dependencies
def provisionNodes(self, item):
jobs = item.findJobsToRequest()
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
if not jobs:
return False
build_set = item.current_build_set

View File

@ -1112,7 +1112,12 @@ class Job(ConfigObject):
d['required_projects'] = []
for project in self.required_projects.values():
d['required_projects'].append(project.toDict())
d['semaphore'] = self.semaphore
if self.semaphore:
# For now just leave the semaphore name here until we really need
# more information in zuul-web about this
d['semaphore'] = self.semaphore.name
else:
d['semaphore'] = None
d['variables'] = self.variables
d['final'] = self.final
d['abstract'] = self.abstract
@ -1511,6 +1516,21 @@ class JobProject(ConfigObject):
return d
class JobSemaphore(ConfigObject):
""" A reference to a semaphore from a job. """
def __init__(self, semaphore_name, resources_first=False):
super().__init__()
self.name = semaphore_name
self.resources_first = resources_first
def toDict(self):
d = dict()
d['name'] = self.name
d['resources_first'] = self.resources_first
return d
class JobList(ConfigObject):
""" A list of jobs in a project's pipeline. """
@ -2135,13 +2155,13 @@ class QueueItem(object):
# The nodes for this job are not ready, skip
# it for now.
continue
if semaphore_handler.acquire(self, job):
if semaphore_handler.acquire(self, job, False):
# If this job needs a semaphore, either acquire it or
# make sure that we have it before running the job.
torun.append(job)
return torun
def findJobsToRequest(self):
def findJobsToRequest(self, semaphore_handler):
build_set = self.current_build_set
toreq = []
if not self.live:
@ -2177,7 +2197,10 @@ class QueueItem(object):
all_parent_jobs_successful = False
break
if all_parent_jobs_successful:
toreq.append(job)
if semaphore_handler.acquire(self, job, True):
# If this job needs a semaphore, either acquire it or
# make sure that we have it before requesting the nodes.
toreq.append(job)
return toreq
def setResult(self, build):
@ -3596,11 +3619,34 @@ class SemaphoreHandler(object):
def __init__(self):
self.semaphores = {}
def acquire(self, item, job):
def acquire(self, item, job, request_resources):
"""
Aquires a semaphore for an item job combination. This gets called twice
during the lifecycle of a job. The first call is before requesting
build resources. The second call is before running the job. In which
call we really acquire the semaphore is defined by the job.
:param item: The item
:param job: The job
:param request_resources: True if we want to acquire for the request
resources phase, False if we want to acquire
for the run phase.
"""
if not job.semaphore:
return True
semaphore_key = job.semaphore
if job.semaphore.resources_first and request_resources:
# We're currently in the resource request phase and want to get the
# resources before locking. So we don't need to do anything here.
return True
else:
# As a safety net we want to acuire the semaphore at least in the
# run phase so don't filter this here as re-acuiring the semaphore
# is not a problem here if it has been already acquired before in
# the resources phase.
pass
semaphore_key = job.semaphore.name
m = self.semaphores.get(semaphore_key)
if not m:
@ -3612,7 +3658,7 @@ class SemaphoreHandler(object):
return True
# semaphore is there, check max
if len(m) < self._max_count(item, job.semaphore):
if len(m) < self._max_count(item, job.semaphore.name):
self._acquire(semaphore_key, item, job.name)
return True
@ -3622,7 +3668,7 @@ class SemaphoreHandler(object):
if not job.semaphore:
return
semaphore_key = job.semaphore
semaphore_key = job.semaphore.name
m = self.semaphores.get(semaphore_key)
if not m: