Add a dequeue command to zuul client
Add the ability for an operator to dequeue a change from a pipeline. Change-Id: I4524291807c8b97b62cfaa31fb5d46dc48adbac9 Signed-off-by: Paul Belanger <pabelanger@redhat.com>
This commit is contained in:
parent
03219737de
commit
c2c5ce26bf
|
@ -30,6 +30,15 @@ Example::
|
|||
|
||||
zuul autohold --tenant openstack --project example_project --job example_job --reason "reason text" --count 1
|
||||
|
||||
Dequeue
|
||||
^^^^^^^
|
||||
.. program-output:: zuul dequeue --help
|
||||
|
||||
Examples::
|
||||
|
||||
zuul dequeue --tenant openstack --pipeline check --project example_project --change 5,1
|
||||
zuul dequeue --tenant openstack --pipeline periodic --project example_project --ref refs/heads/master
|
||||
|
||||
Enqueue
|
||||
^^^^^^^
|
||||
.. program-output:: zuul enqueue --help
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
The `dequeue` command has been added to the Zuul CLI.
|
||||
It allows operators to stop a given buildset at will.
|
|
@ -3709,6 +3709,119 @@ class TestScheduler(ZuulTestCase):
|
|||
self.assertIn('project-post', job_names)
|
||||
self.assertEqual(r, True)
|
||||
|
||||
def test_client_dequeue_dependent_change(self):
|
||||
"Test that the RPC client can dequeue a change"
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
self.addCleanup(client.shutdown)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
|
||||
|
||||
C.setDependsOn(B, 1)
|
||||
B.setDependsOn(A, 1)
|
||||
|
||||
A.addApproval('Code-Review', 2)
|
||||
B.addApproval('Code-Review', 2)
|
||||
C.addApproval('Code-Review', 2)
|
||||
|
||||
# Promote to 'gate' pipeline
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
|
||||
self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
client.dequeue(
|
||||
tenant='tenant-one',
|
||||
pipeline='gate',
|
||||
project='org/project',
|
||||
change='1,1',
|
||||
ref=None)
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
gate_pipeline = tenant.layout.pipelines['gate']
|
||||
self.assertEqual(gate_pipeline.getAllItems(), [])
|
||||
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_client_dequeue_independent_change(self):
|
||||
"Test that the RPC client can dequeue a change"
|
||||
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
self.addCleanup(client.shutdown)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
|
||||
|
||||
A.addApproval('Code-Review', 2)
|
||||
B.addApproval('Code-Review', 2)
|
||||
C.addApproval('Code-Review', 2)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
client.dequeue(
|
||||
tenant='tenant-one',
|
||||
pipeline='check',
|
||||
project='org/project',
|
||||
change='1,1',
|
||||
ref=None)
|
||||
self.waitUntilSettled()
|
||||
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
check_pipeline = tenant.layout.pipelines['check']
|
||||
self.assertEqual(len(check_pipeline.getAllItems()), 2)
|
||||
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_client_dequeue_change_by_ref(self):
|
||||
"Test that the RPC client can dequeue a change by ref"
|
||||
# Test this on the periodic pipeline, where it makes most sense to
|
||||
# use ref
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
self.gearman_server.port)
|
||||
self.addCleanup(client.shutdown)
|
||||
|
||||
self.create_branch('org/project', 'stable')
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.commitConfigUpdate('common-config', 'layouts/timer.yaml')
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
client.dequeue(
|
||||
tenant='tenant-one',
|
||||
pipeline='periodic',
|
||||
project='org/project',
|
||||
change=None,
|
||||
ref='refs/heads/stable')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.commitConfigUpdate('common-config',
|
||||
'layouts/no-timer.yaml')
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 1)
|
||||
|
||||
def test_client_enqueue_negative(self):
|
||||
"Test that the RPC client returns errors"
|
||||
client = zuul.rpcclient.RPCClient('127.0.0.1',
|
||||
|
|
|
@ -111,6 +111,21 @@ class Client(zuul.cmd.ZuulApp):
|
|||
'--newrev', help='new revision', default=None)
|
||||
cmd_enqueue.set_defaults(func=self.enqueue_ref)
|
||||
|
||||
cmd_dequeue = subparsers.add_parser('dequeue',
|
||||
help='dequeue a buildset by its '
|
||||
'change or ref')
|
||||
cmd_dequeue.add_argument('--tenant', help='tenant name',
|
||||
required=True)
|
||||
cmd_dequeue.add_argument('--pipeline', help='pipeline name',
|
||||
required=True)
|
||||
cmd_dequeue.add_argument('--project', help='project name',
|
||||
required=True)
|
||||
cmd_dequeue.add_argument('--change', help='change id',
|
||||
default=None)
|
||||
cmd_dequeue.add_argument('--ref', help='ref name',
|
||||
default=None)
|
||||
cmd_dequeue.set_defaults(func=self.dequeue)
|
||||
|
||||
cmd_promote = subparsers.add_parser('promote',
|
||||
help='promote one or more changes')
|
||||
cmd_promote.add_argument('--tenant', help='tenant name',
|
||||
|
@ -164,6 +179,12 @@ class Client(zuul.cmd.ZuulApp):
|
|||
self.args.oldrev = '0000000000000000000000000000000000000000'
|
||||
if self.args.newrev is None:
|
||||
self.args.newrev = '0000000000000000000000000000000000000000'
|
||||
if self.args.func == self.dequeue:
|
||||
if self.args.change is None and self.args.ref is None:
|
||||
parser.error("Change or ref needed.")
|
||||
if self.args.change is not None and self.args.ref is not None:
|
||||
parser.error(
|
||||
"The 'change' and 'ref' arguments are mutually exclusive.")
|
||||
|
||||
def setup_logging(self):
|
||||
"""Client logging does not rely on conf file"""
|
||||
|
@ -255,6 +276,16 @@ class Client(zuul.cmd.ZuulApp):
|
|||
newrev=self.args.newrev)
|
||||
return r
|
||||
|
||||
def dequeue(self):
|
||||
client = zuul.rpcclient.RPCClient(
|
||||
self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
|
||||
r = client.dequeue(tenant=self.args.tenant,
|
||||
pipeline=self.args.pipeline,
|
||||
project=self.args.project,
|
||||
change=self.args.change,
|
||||
ref=self.args.ref)
|
||||
return r
|
||||
|
||||
def promote(self):
|
||||
client = zuul.rpcclient.RPCClient(
|
||||
self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
|
||||
|
|
|
@ -89,6 +89,15 @@ class RPCClient(object):
|
|||
}
|
||||
return not self.submitJob('zuul:enqueue_ref', data).failure
|
||||
|
||||
def dequeue(self, tenant, pipeline, project, change, ref):
|
||||
data = {'tenant': tenant,
|
||||
'pipeline': pipeline,
|
||||
'project': project,
|
||||
'change': change,
|
||||
'ref': ref,
|
||||
}
|
||||
return not self.submitJob('zuul:dequeue', data).failure
|
||||
|
||||
def promote(self, tenant, pipeline, change_ids):
|
||||
data = {'tenant': tenant,
|
||||
'pipeline': pipeline,
|
||||
|
|
|
@ -52,6 +52,7 @@ class RPCListener(object):
|
|||
def register(self):
|
||||
self.worker.registerFunction("zuul:autohold")
|
||||
self.worker.registerFunction("zuul:autohold_list")
|
||||
self.worker.registerFunction("zuul:dequeue")
|
||||
self.worker.registerFunction("zuul:enqueue")
|
||||
self.worker.registerFunction("zuul:enqueue_ref")
|
||||
self.worker.registerFunction("zuul:promote")
|
||||
|
@ -121,6 +122,21 @@ class RPCListener(object):
|
|||
except Exception:
|
||||
self.log.exception("Exception while getting job")
|
||||
|
||||
def handle_dequeue(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
tenant_name = args['tenant']
|
||||
pipeline_name = args['pipeline']
|
||||
project_name = args['project']
|
||||
change = args['change']
|
||||
ref = args['ref']
|
||||
try:
|
||||
self.sched.dequeue(
|
||||
tenant_name, pipeline_name, project_name, change, ref)
|
||||
except Exception as e:
|
||||
job.sendWorkException(str(e).encode('utf8'))
|
||||
return
|
||||
job.sendWorkComplete()
|
||||
|
||||
def handle_autohold_list(self, job):
|
||||
req = {}
|
||||
|
||||
|
|
|
@ -119,6 +119,32 @@ class PromoteEvent(ManagementEvent):
|
|||
self.change_ids = change_ids
|
||||
|
||||
|
||||
class DequeueEvent(ManagementEvent):
|
||||
"""Dequeue a change from a pipeline
|
||||
|
||||
:arg str tenant_name: the name of the tenant
|
||||
:arg str pipeline_name: the name of the pipeline
|
||||
:arg str project_name: the name of the project
|
||||
:arg str change: optional, the change to dequeue
|
||||
:arg str ref: optional, the ref to look for
|
||||
"""
|
||||
|
||||
def __init__(self, tenant_name, pipeline_name, project_name, change, ref):
|
||||
super(DequeueEvent, self).__init__()
|
||||
self.tenant_name = tenant_name
|
||||
self.pipeline_name = pipeline_name
|
||||
self.project_name = project_name
|
||||
self.change = change
|
||||
if change is not None:
|
||||
self.change_number, self.patch_number = change.split(',')
|
||||
else:
|
||||
self.change_number, self.patch_number = (None, None)
|
||||
self.ref = ref
|
||||
# set to mock values
|
||||
self.oldrev = '0000000000000000000000000000000000000000'
|
||||
self.newrev = '0000000000000000000000000000000000000000'
|
||||
|
||||
|
||||
class EnqueueEvent(ManagementEvent):
|
||||
"""Enqueue a change into a pipeline
|
||||
|
||||
|
@ -465,6 +491,15 @@ class Scheduler(threading.Thread):
|
|||
event.wait()
|
||||
self.log.debug("Promotion complete")
|
||||
|
||||
def dequeue(self, tenant_name, pipeline_name, project_name, change, ref):
|
||||
event = DequeueEvent(
|
||||
tenant_name, pipeline_name, project_name, change, ref)
|
||||
self.management_event_queue.put(event)
|
||||
self.wake_event.set()
|
||||
self.log.debug("Waiting for dequeue")
|
||||
event.wait()
|
||||
self.log.debug("Dequeue complete")
|
||||
|
||||
def enqueue(self, trigger_event):
|
||||
event = EnqueueEvent(trigger_event)
|
||||
self.management_event_queue.put(event)
|
||||
|
@ -830,6 +865,23 @@ class Scheduler(threading.Thread):
|
|||
quiet=True,
|
||||
ignore_requirements=True)
|
||||
|
||||
def _doDequeueEvent(self, event):
|
||||
tenant = self.abide.tenants.get(event.tenant_name)
|
||||
pipeline = tenant.layout.pipelines[event.pipeline_name]
|
||||
(trusted, project) = tenant.getProject(event.project_name)
|
||||
change = project.source.getChange(event, project)
|
||||
for shared_queue in pipeline.queues:
|
||||
for item in shared_queue.queue:
|
||||
if (isinstance(item.change, model.Change) and
|
||||
item.change.number == change.number and
|
||||
item.change.patchset == change.patchset) or\
|
||||
(item.change.ref == change.ref):
|
||||
pipeline.manager.removeItem(item)
|
||||
return
|
||||
raise Exception("Unable to find shared change queue for %s:%s" %
|
||||
(event.project_name,
|
||||
event.change or event.ref))
|
||||
|
||||
def _doEnqueueEvent(self, event):
|
||||
tenant = self.abide.tenants.get(event.tenant_name)
|
||||
full_project_name = ('/'.join([event.project_hostname,
|
||||
|
@ -987,6 +1039,8 @@ class Scheduler(threading.Thread):
|
|||
self._doTenantReconfigureEvent(event)
|
||||
elif isinstance(event, PromoteEvent):
|
||||
self._doPromoteEvent(event)
|
||||
elif isinstance(event, DequeueEvent):
|
||||
self._doDequeueEvent(event)
|
||||
elif isinstance(event, EnqueueEvent):
|
||||
self._doEnqueueEvent(event.trigger_event)
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue