Consider shared changes queues for relative_priority
When calculating relative_priority for independent pipelines, use shared change queues just as is done for dependent pipelines. To implement this, we now calculate shared change queues for all pipelines, not just dependent ones, though we don't use those queues for any purpose other than this. Change-Id: I59b1090ca1f4fcc72276445e6ff4c5cf4f2f5030
This commit is contained in:
parent
5f3a435c14
commit
407643a4e6
|
@ -286,11 +286,11 @@ The following sections of ``zuul.conf`` are used by the scheduler:
|
|||
is used by Nodepool to satisfy requests from higher-precedence
|
||||
pipelines first. If ``relative_priority`` is set to ``True``,
|
||||
then Zuul will additionally group items in the same pipeline by
|
||||
project and weight each request by its position in that
|
||||
project's group. A request for the first change of a given
|
||||
project will have the highest relative priority, and the second
|
||||
pipeline queue and weight each request by its position in that
|
||||
project's group. A request for the first change in a given
|
||||
queue will have the highest relative priority, and the second
|
||||
change a lower relative priority. The first change of each
|
||||
project in a pipeline has the same relative priority, regardless
|
||||
queue in a pipeline has the same relative priority, regardless
|
||||
of the order of submission or how many other changes are in the
|
||||
pipeline. This can be used to make node allocations complete
|
||||
faster for projects with fewer changes in a system dominated by
|
||||
|
|
|
@ -1262,6 +1262,10 @@ pipeline.
|
|||
stanza; it may appear in secondary instances or even in a
|
||||
:ref:`project-template` definition.
|
||||
|
||||
Pipeline managers other than `dependent` do not use this
|
||||
attribute, however, it may still be used if
|
||||
:attr:`scheduler.relative_priority` is enabled.
|
||||
|
||||
.. attr:: debug
|
||||
|
||||
If this is set to `true`, Zuul will include debugging
|
||||
|
|
|
@ -61,6 +61,7 @@
|
|||
- project:
|
||||
name: org/project1
|
||||
check:
|
||||
queue: integrated
|
||||
jobs:
|
||||
- integration
|
||||
gate:
|
||||
|
@ -71,6 +72,7 @@
|
|||
- project:
|
||||
name: org/project2
|
||||
check:
|
||||
queue: integrated
|
||||
jobs:
|
||||
- integration
|
||||
gate:
|
||||
|
|
|
@ -5167,6 +5167,10 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
|
||||
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
reqs = self.fake_nodepool.getNodeRequests()
|
||||
|
||||
# The requests come back sorted by priority.
|
||||
|
@ -5183,11 +5187,15 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual(reqs[2]['_oid'], '200-0000000001')
|
||||
self.assertEqual(reqs[2]['relative_priority'], 1)
|
||||
|
||||
# Change D, first change for project2 shared with project1,
|
||||
# lower relative priority than project1.
|
||||
self.assertEqual(reqs[3]['_oid'], '200-0000000003')
|
||||
self.assertEqual(reqs[3]['relative_priority'], 1)
|
||||
|
||||
# Fulfill only the first request
|
||||
self.fake_nodepool.fulfillRequest(reqs[0])
|
||||
for x in iterate_timeout(30, 'fulfill request'):
|
||||
self.log.debug(len(self.sched.nodepool.requests))
|
||||
if len(self.sched.nodepool.requests) < 3:
|
||||
if len(self.sched.nodepool.requests) < 4:
|
||||
break
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
@ -5201,6 +5209,11 @@ For CI problems and help debugging, contact ci@example.org"""
|
|||
self.assertEqual(reqs[1]['_oid'], '200-0000000002')
|
||||
self.assertEqual(reqs[1]['relative_priority'], 0)
|
||||
|
||||
# Change D, first change for project2 shared with project1,
|
||||
# still lower relative priority than project1.
|
||||
self.assertEqual(reqs[2]['_oid'], '200-0000000003')
|
||||
self.assertEqual(reqs[2]['relative_priority'], 1)
|
||||
|
||||
self.fake_nodepool.unpause()
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
|
|
@ -58,7 +58,43 @@ class PipelineManager(object):
|
|||
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
|
||||
|
||||
def _postConfig(self, layout):
|
||||
pass
|
||||
# All pipelines support shared queues for setting
|
||||
# relative_priority; only the dependent pipeline uses them for
|
||||
# pipeline queing.
|
||||
self.buildChangeQueues(layout)
|
||||
|
||||
def buildChangeQueues(self, layout):
|
||||
self.log.debug("Building relative_priority queues")
|
||||
change_queues = self.pipeline.relative_priority_queues
|
||||
tenant = self.pipeline.tenant
|
||||
layout_project_configs = layout.project_configs
|
||||
|
||||
for project_name, project_configs in layout_project_configs.items():
|
||||
(trusted, project) = tenant.getProject(project_name)
|
||||
queue_name = None
|
||||
project_in_pipeline = False
|
||||
for project_config in layout.getAllProjectConfigs(project_name):
|
||||
project_pipeline_config = project_config.pipelines.get(
|
||||
self.pipeline.name)
|
||||
if project_pipeline_config is None:
|
||||
continue
|
||||
project_in_pipeline = True
|
||||
queue_name = project_pipeline_config.queue_name
|
||||
if queue_name:
|
||||
break
|
||||
if not project_in_pipeline:
|
||||
continue
|
||||
if not queue_name:
|
||||
continue
|
||||
if queue_name in change_queues:
|
||||
change_queue = change_queues[queue_name]
|
||||
else:
|
||||
change_queue = []
|
||||
change_queues[queue_name] = change_queue
|
||||
self.log.debug("Created queue: %s" % queue_name)
|
||||
change_queue.append(project)
|
||||
self.log.debug("Added project %s to queue: %s" %
|
||||
(project, queue_name))
|
||||
|
||||
def getSubmitAllowNeeds(self):
|
||||
# Get a list of code review labels that are allowed to be
|
||||
|
@ -86,9 +122,10 @@ class PipelineManager(object):
|
|||
return False
|
||||
|
||||
def getNodePriority(self, item):
|
||||
queue = self.pipeline.getRelativePriorityQueue(item.change.project)
|
||||
items = self.pipeline.getAllItems()
|
||||
items = [i for i in items
|
||||
if i.change.project == item.change.project and
|
||||
if i.change.project in queue and
|
||||
i.live]
|
||||
return items.index(item)
|
||||
|
||||
|
|
|
@ -28,10 +28,6 @@ class DependentPipelineManager(PipelineManager):
|
|||
def __init__(self, *args, **kwargs):
|
||||
super(DependentPipelineManager, self).__init__(*args, **kwargs)
|
||||
|
||||
def _postConfig(self, layout):
|
||||
super(DependentPipelineManager, self)._postConfig(layout)
|
||||
self.buildChangeQueues(layout)
|
||||
|
||||
def buildChangeQueues(self, layout):
|
||||
self.log.debug("Building shared change queues")
|
||||
change_queues = {}
|
||||
|
|
|
@ -23,7 +23,8 @@ class IndependentPipelineManager(PipelineManager):
|
|||
super(IndependentPipelineManager, self)._postConfig(layout)
|
||||
|
||||
def getChangeQueue(self, change, existing=None):
|
||||
# creates a new change queue for every change
|
||||
# We ignore any shared change queues on the pipeline and
|
||||
# instead create a new change queue for every change.
|
||||
if existing:
|
||||
return DynamicChangeQueueContextManager(existing)
|
||||
change_queue = model.ChangeQueue(self.pipeline)
|
||||
|
|
|
@ -250,6 +250,7 @@ class Pipeline(object):
|
|||
self.ignore_dependencies = False
|
||||
self.manager = None
|
||||
self.queues = []
|
||||
self.relative_priority_queues = {}
|
||||
self.precedence = PRECEDENCE_NORMAL
|
||||
self.triggers = []
|
||||
self.start_actions = []
|
||||
|
@ -295,6 +296,12 @@ class Pipeline(object):
|
|||
return queue
|
||||
return None
|
||||
|
||||
def getRelativePriorityQueue(self, project):
|
||||
for queue in self.relative_priority_queues.values():
|
||||
if project in queue:
|
||||
return queue
|
||||
return [project]
|
||||
|
||||
def removeQueue(self, queue):
|
||||
if queue in self.queues:
|
||||
self.queues.remove(queue)
|
||||
|
|
Loading…
Reference in New Issue