Consider shared changes queues for relative_priority

When calculating relative_priority for independent pipelines,
use shared change queues just as is done for dependent pipelines.

To implement this, we now calculate shared change queues for all
pipelines, not just dependent ones, though we don't use those
queues for any purpose other than this.

Change-Id: I59b1090ca1f4fcc72276445e6ff4c5cf4f2f5030
This commit is contained in:
James E. Blair 2018-12-07 14:26:18 -08:00
parent 5f3a435c14
commit 407643a4e6
8 changed files with 73 additions and 13 deletions

View File

@ -286,11 +286,11 @@ The following sections of ``zuul.conf`` are used by the scheduler:
is used by Nodepool to satisfy requests from higher-precedence
pipelines first. If ``relative_priority`` is set to ``True``,
then Zuul will additionally group items in the same pipeline by
project and weight each request by its position in that
project's group. A request for the first change of a given
project will have the highest relative priority, and the second
pipeline queue and weight each request by its position in that
project's group. A request for the first change in a given
queue will have the highest relative priority, and the second
change a lower relative priority. The first change of each
project in a pipeline has the same relative priority, regardless
queue in a pipeline has the same relative priority, regardless
of the order of submission or how many other changes are in the
pipeline. This can be used to make node allocations complete
faster for projects with fewer changes in a system dominated by

View File

@ -1262,6 +1262,10 @@ pipeline.
stanza; it may appear in secondary instances or even in a
:ref:`project-template` definition.
Pipeline managers other than `dependent` do not use this
attribute, however, it may still be used if
:attr:`scheduler.relative_priority` is enabled.
.. attr:: debug
If this is set to `true`, Zuul will include debugging

View File

@ -61,6 +61,7 @@
- project:
name: org/project1
check:
queue: integrated
jobs:
- integration
gate:
@ -71,6 +72,7 @@
- project:
name: org/project2
check:
queue: integrated
jobs:
- integration
gate:

View File

@ -5167,6 +5167,10 @@ For CI problems and help debugging, contact ci@example.org"""
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
reqs = self.fake_nodepool.getNodeRequests()
# The requests come back sorted by priority.
@ -5183,11 +5187,15 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(reqs[2]['_oid'], '200-0000000001')
self.assertEqual(reqs[2]['relative_priority'], 1)
# Change D, first change for project2 shared with project1,
# lower relative priority than project1.
self.assertEqual(reqs[3]['_oid'], '200-0000000003')
self.assertEqual(reqs[3]['relative_priority'], 1)
# Fulfill only the first request
self.fake_nodepool.fulfillRequest(reqs[0])
for x in iterate_timeout(30, 'fulfill request'):
self.log.debug(len(self.sched.nodepool.requests))
if len(self.sched.nodepool.requests) < 3:
if len(self.sched.nodepool.requests) < 4:
break
self.waitUntilSettled()
@ -5201,6 +5209,11 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(reqs[1]['_oid'], '200-0000000002')
self.assertEqual(reqs[1]['relative_priority'], 0)
# Change D, first change for project2 shared with project1,
# still lower relative priority than project1.
self.assertEqual(reqs[2]['_oid'], '200-0000000003')
self.assertEqual(reqs[2]['relative_priority'], 1)
self.fake_nodepool.unpause()
self.waitUntilSettled()

View File

@ -58,7 +58,43 @@ class PipelineManager(object):
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
def _postConfig(self, layout):
pass
# All pipelines support shared queues for setting
# relative_priority; only the dependent pipeline uses them for
# pipeline queing.
self.buildChangeQueues(layout)
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
change_queues = self.pipeline.relative_priority_queues
tenant = self.pipeline.tenant
layout_project_configs = layout.project_configs
for project_name, project_configs in layout_project_configs.items():
(trusted, project) = tenant.getProject(project_name)
queue_name = None
project_in_pipeline = False
for project_config in layout.getAllProjectConfigs(project_name):
project_pipeline_config = project_config.pipelines.get(
self.pipeline.name)
if project_pipeline_config is None:
continue
project_in_pipeline = True
queue_name = project_pipeline_config.queue_name
if queue_name:
break
if not project_in_pipeline:
continue
if not queue_name:
continue
if queue_name in change_queues:
change_queue = change_queues[queue_name]
else:
change_queue = []
change_queues[queue_name] = change_queue
self.log.debug("Created queue: %s" % queue_name)
change_queue.append(project)
self.log.debug("Added project %s to queue: %s" %
(project, queue_name))
def getSubmitAllowNeeds(self):
# Get a list of code review labels that are allowed to be
@ -86,9 +122,10 @@ class PipelineManager(object):
return False
def getNodePriority(self, item):
queue = self.pipeline.getRelativePriorityQueue(item.change.project)
items = self.pipeline.getAllItems()
items = [i for i in items
if i.change.project == item.change.project and
if i.change.project in queue and
i.live]
return items.index(item)

View File

@ -28,10 +28,6 @@ class DependentPipelineManager(PipelineManager):
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
def _postConfig(self, layout):
super(DependentPipelineManager, self)._postConfig(layout)
self.buildChangeQueues(layout)
def buildChangeQueues(self, layout):
self.log.debug("Building shared change queues")
change_queues = {}

View File

@ -23,7 +23,8 @@ class IndependentPipelineManager(PipelineManager):
super(IndependentPipelineManager, self)._postConfig(layout)
def getChangeQueue(self, change, existing=None):
# creates a new change queue for every change
# We ignore any shared change queues on the pipeline and
# instead create a new change queue for every change.
if existing:
return DynamicChangeQueueContextManager(existing)
change_queue = model.ChangeQueue(self.pipeline)

View File

@ -250,6 +250,7 @@ class Pipeline(object):
self.ignore_dependencies = False
self.manager = None
self.queues = []
self.relative_priority_queues = {}
self.precedence = PRECEDENCE_NORMAL
self.triggers = []
self.start_actions = []
@ -295,6 +296,12 @@ class Pipeline(object):
return queue
return None
def getRelativePriorityQueue(self, project):
for queue in self.relative_priority_queues.values():
if project in queue:
return queue
return [project]
def removeQueue(self, queue):
if queue in self.queues:
self.queues.remove(queue)