Improve resource usage with semaphores
Currently when jobs use semaphores they first get and lock the build nodes and then aquire the semaphore. If there are many jobs waiting for the semaphore this can block a substantial part of the available resources. In order to make this safe default to acquire the semaphore before requesting the nodes. However in some cases when jobs with a semaphore shall run as fast as possible in a consecutive manner then it might be preferrable to accept some waste of resources. In order to support this use case the job using a semaphore can override this behavior and still acquire the semaphore after getting the nodes. Change-Id: Id6f582ec29219d280d05319d1b822c7934437b7a
This commit is contained in:
parent
33699fa316
commit
ae887dab58
|
@ -629,7 +629,25 @@ Here is an example of two job definitions:
|
|||
The name of a :ref:`semaphore` which should be acquired and
|
||||
released when the job begins and ends. If the semaphore is at
|
||||
maximum capacity, then Zuul will wait until it can be acquired
|
||||
before starting the job.
|
||||
before starting the job. The format is either a string or a
|
||||
dictionary. If it's a string it references a semaphore using the
|
||||
default value for :attr:`job.semaphore.resources-first`.
|
||||
|
||||
.. attr:: name
|
||||
:required:
|
||||
|
||||
The name of the referenced semaphore
|
||||
|
||||
.. attr:: resources-first
|
||||
:default: False
|
||||
|
||||
By default a semaphore is acquired before the resources are
|
||||
requested. However in some cases the user wants to run cheap
|
||||
jobs as quickly as possible in a consecutive manner. In this
|
||||
case :attr:`job.semaphore.resources-first` can be enabled to
|
||||
request the resources before locking the semaphore. This can
|
||||
lead to some amount of blocked resources while waiting for the
|
||||
semaphore so this should be used with caution.
|
||||
|
||||
.. attr:: tags
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
A job using a semaphore now can configure if it should acquire the it
|
||||
before requesting resources or just before running.
|
||||
upgrade:
|
||||
- |
|
||||
The acquiring behavior of jobs with semaphores has been changed. Up to now
|
||||
a job requested resources and aquired the semaphore just before it started
|
||||
to run. However this could lead to a high amount of resource waste. Instead
|
||||
jobs now acquire the semaphore before requesting the resources by default.
|
||||
This behavior can be overridden by jobs using
|
||||
:attr:`job.semaphore.resources-first` if some waste of resources is
|
||||
acceptable.
|
|
@ -22,6 +22,10 @@
|
|||
- job:
|
||||
name: base
|
||||
parent: null
|
||||
nodeset:
|
||||
nodes:
|
||||
- name: controller
|
||||
label: label1
|
||||
|
||||
- job:
|
||||
name: project-test1
|
||||
|
@ -56,6 +60,20 @@
|
|||
- name: controller
|
||||
label: label1
|
||||
|
||||
- job:
|
||||
name: semaphore-one-test1-resources-first
|
||||
semaphore:
|
||||
name: test-semaphore
|
||||
resources-first: True
|
||||
run: playbooks/semaphore-one-test1.yaml
|
||||
|
||||
- job:
|
||||
name: semaphore-one-test2-resources-first
|
||||
semaphore:
|
||||
name: test-semaphore
|
||||
resources-first: True
|
||||
run: playbooks/semaphore-one-test1.yaml
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
check:
|
||||
|
@ -77,3 +95,11 @@
|
|||
check:
|
||||
jobs:
|
||||
- semaphore-one-test3
|
||||
|
||||
- project:
|
||||
name: org/project3
|
||||
check:
|
||||
jobs:
|
||||
- project-test1
|
||||
- semaphore-one-test1-resources-first
|
||||
- semaphore-one-test2-resources-first
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -8,3 +8,4 @@
|
|||
- org/project
|
||||
- org/project1
|
||||
- org/project2
|
||||
- org/project3
|
||||
|
|
|
@ -5849,6 +5849,10 @@ class TestSemaphore(ZuulTestCase):
|
|||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Pause nodepool so we can check the ordering of getting the nodes
|
||||
# and aquiring the semaphore.
|
||||
self.fake_nodepool.paused = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
self.assertFalse('test-semaphore' in
|
||||
|
@ -5858,6 +5862,13 @@ class TestSemaphore(ZuulTestCase):
|
|||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# By default we first lock the semaphore and then get the nodes
|
||||
# so at this point the semaphore needs to be aquired.
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.fake_nodepool.paused = False
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'semaphore-one-test1')
|
||||
|
@ -5993,6 +6004,53 @@ class TestSemaphore(ZuulTestCase):
|
|||
self.assertEqual(A.reported, 1)
|
||||
self.assertEqual(B.reported, 1)
|
||||
|
||||
def test_semaphore_resources_first(self):
|
||||
"Test semaphores with max=1 (mutex) and get resources first"
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
# Pause nodepool so we can check the ordering of getting the nodes
|
||||
# and aquiring the semaphore.
|
||||
self.fake_nodepool.paused = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B')
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Here we first get the resources and then lock the semaphore
|
||||
# so at this point the semaphore should not be aquired.
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.fake_nodepool.paused = False
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name,
|
||||
'semaphore-one-test1-resources-first')
|
||||
self.assertEqual(self.builds[2].name, 'project-test1')
|
||||
|
||||
self.executor_server.release('semaphore-one-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertEqual(self.builds[2].name,
|
||||
'semaphore-one-test2-resources-first')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_semaphore_zk_error(self):
|
||||
"Test semaphore release with zk error"
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
|
|
|
@ -509,6 +509,9 @@ class JobParser(object):
|
|||
secret = {vs.Required('name'): str,
|
||||
vs.Required('secret'): str}
|
||||
|
||||
semaphore = {vs.Required('name'): str,
|
||||
'resources-first': bool}
|
||||
|
||||
# Attributes of a job that can also be used in Project and ProjectTemplate
|
||||
job_attributes = {'parent': vs.Any(str, None),
|
||||
'final': bool,
|
||||
|
@ -520,7 +523,7 @@ class JobParser(object):
|
|||
'success-url': str,
|
||||
'hold-following-changes': bool,
|
||||
'voting': bool,
|
||||
'semaphore': str,
|
||||
'semaphore': vs.Any(semaphore, str),
|
||||
'tags': to_list(str),
|
||||
'branches': to_list(str),
|
||||
'files': to_list(str),
|
||||
|
@ -565,7 +568,6 @@ class JobParser(object):
|
|||
'workspace',
|
||||
'voting',
|
||||
'hold-following-changes',
|
||||
'semaphore',
|
||||
'attempts',
|
||||
'failure-message',
|
||||
'success-message',
|
||||
|
@ -720,6 +722,15 @@ class JobParser(object):
|
|||
new_projects[project.canonical_name] = job_project
|
||||
job.required_projects = new_projects
|
||||
|
||||
if 'semaphore' in conf:
|
||||
semaphore = conf.get('semaphore')
|
||||
if isinstance(semaphore, str):
|
||||
job.semaphore = model.JobSemaphore(semaphore)
|
||||
else:
|
||||
job.semaphore = model.JobSemaphore(
|
||||
semaphore.get('name'),
|
||||
semaphore.get('resources-first', False))
|
||||
|
||||
tags = conf.get('tags')
|
||||
if tags:
|
||||
job.tags = set(tags)
|
||||
|
|
|
@ -322,7 +322,7 @@ class PipelineManager(object):
|
|||
change.commit_needs_changes = dependencies
|
||||
|
||||
def provisionNodes(self, item):
|
||||
jobs = item.findJobsToRequest()
|
||||
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
|
||||
if not jobs:
|
||||
return False
|
||||
build_set = item.current_build_set
|
||||
|
|
|
@ -1112,7 +1112,12 @@ class Job(ConfigObject):
|
|||
d['required_projects'] = []
|
||||
for project in self.required_projects.values():
|
||||
d['required_projects'].append(project.toDict())
|
||||
d['semaphore'] = self.semaphore
|
||||
if self.semaphore:
|
||||
# For now just leave the semaphore name here until we really need
|
||||
# more information in zuul-web about this
|
||||
d['semaphore'] = self.semaphore.name
|
||||
else:
|
||||
d['semaphore'] = None
|
||||
d['variables'] = self.variables
|
||||
d['final'] = self.final
|
||||
d['abstract'] = self.abstract
|
||||
|
@ -1511,6 +1516,21 @@ class JobProject(ConfigObject):
|
|||
return d
|
||||
|
||||
|
||||
class JobSemaphore(ConfigObject):
|
||||
""" A reference to a semaphore from a job. """
|
||||
|
||||
def __init__(self, semaphore_name, resources_first=False):
|
||||
super().__init__()
|
||||
self.name = semaphore_name
|
||||
self.resources_first = resources_first
|
||||
|
||||
def toDict(self):
|
||||
d = dict()
|
||||
d['name'] = self.name
|
||||
d['resources_first'] = self.resources_first
|
||||
return d
|
||||
|
||||
|
||||
class JobList(ConfigObject):
|
||||
""" A list of jobs in a project's pipeline. """
|
||||
|
||||
|
@ -2135,13 +2155,13 @@ class QueueItem(object):
|
|||
# The nodes for this job are not ready, skip
|
||||
# it for now.
|
||||
continue
|
||||
if semaphore_handler.acquire(self, job):
|
||||
if semaphore_handler.acquire(self, job, False):
|
||||
# If this job needs a semaphore, either acquire it or
|
||||
# make sure that we have it before running the job.
|
||||
torun.append(job)
|
||||
return torun
|
||||
|
||||
def findJobsToRequest(self):
|
||||
def findJobsToRequest(self, semaphore_handler):
|
||||
build_set = self.current_build_set
|
||||
toreq = []
|
||||
if not self.live:
|
||||
|
@ -2177,7 +2197,10 @@ class QueueItem(object):
|
|||
all_parent_jobs_successful = False
|
||||
break
|
||||
if all_parent_jobs_successful:
|
||||
toreq.append(job)
|
||||
if semaphore_handler.acquire(self, job, True):
|
||||
# If this job needs a semaphore, either acquire it or
|
||||
# make sure that we have it before requesting the nodes.
|
||||
toreq.append(job)
|
||||
return toreq
|
||||
|
||||
def setResult(self, build):
|
||||
|
@ -3596,11 +3619,34 @@ class SemaphoreHandler(object):
|
|||
def __init__(self):
|
||||
self.semaphores = {}
|
||||
|
||||
def acquire(self, item, job):
|
||||
def acquire(self, item, job, request_resources):
|
||||
"""
|
||||
Aquires a semaphore for an item job combination. This gets called twice
|
||||
during the lifecycle of a job. The first call is before requesting
|
||||
build resources. The second call is before running the job. In which
|
||||
call we really acquire the semaphore is defined by the job.
|
||||
|
||||
:param item: The item
|
||||
:param job: The job
|
||||
:param request_resources: True if we want to acquire for the request
|
||||
resources phase, False if we want to acquire
|
||||
for the run phase.
|
||||
"""
|
||||
if not job.semaphore:
|
||||
return True
|
||||
|
||||
semaphore_key = job.semaphore
|
||||
if job.semaphore.resources_first and request_resources:
|
||||
# We're currently in the resource request phase and want to get the
|
||||
# resources before locking. So we don't need to do anything here.
|
||||
return True
|
||||
else:
|
||||
# As a safety net we want to acuire the semaphore at least in the
|
||||
# run phase so don't filter this here as re-acuiring the semaphore
|
||||
# is not a problem here if it has been already acquired before in
|
||||
# the resources phase.
|
||||
pass
|
||||
|
||||
semaphore_key = job.semaphore.name
|
||||
|
||||
m = self.semaphores.get(semaphore_key)
|
||||
if not m:
|
||||
|
@ -3612,7 +3658,7 @@ class SemaphoreHandler(object):
|
|||
return True
|
||||
|
||||
# semaphore is there, check max
|
||||
if len(m) < self._max_count(item, job.semaphore):
|
||||
if len(m) < self._max_count(item, job.semaphore.name):
|
||||
self._acquire(semaphore_key, item, job.name)
|
||||
return True
|
||||
|
||||
|
@ -3622,7 +3668,7 @@ class SemaphoreHandler(object):
|
|||
if not job.semaphore:
|
||||
return
|
||||
|
||||
semaphore_key = job.semaphore
|
||||
semaphore_key = job.semaphore.name
|
||||
|
||||
m = self.semaphores.get(semaphore_key)
|
||||
if not m:
|
||||
|
|
Loading…
Reference in New Issue