Set relative priority of node requests

Add a relative_priority field to node requests and continuously
adjust it for each queue item based on the contents of queues.

This allows for a more fair distribution of build resources between
different projects.  The first item in a pipeline from a given
project (or, in the case of a dependent pipeline, group of projects)
has equal priority to all other first-items of other projcets in
the same pipeline.  Second items have a lower priority, etc.

Depends-On: https://review.openstack.org/620954
Change-Id: Id3799aeb2cec6d96a662bfa394a538050f7ea947
This commit is contained in:
James E. Blair 2018-11-02 16:21:34 -07:00 committed by Tobias Henkel
parent 9c2b0a9bbe
commit 0b00c4685b
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
14 changed files with 427 additions and 46 deletions

View File

@ -276,6 +276,32 @@ The following sections of ``zuul.conf`` are used by the scheduler:
Path to directory in which Zuul should save its state.
.. attr:: relative_priority
:default: False
A boolean which indicates whether the scheduler should supply
relative priority information for node requests.
In all cases, each pipeline may specify a precedence value which
is used by Nodepool to satisfy requests from higher-precedence
pipelines first. If ``relative_priority`` is set to ``True``,
then Zuul will additionally group items in the same pipeline by
project and weight each request by its position in that
project's group. A request for the first change of a given
project will have the highest relative priority, and the second
change a lower relative priority. The first change of each
project in a pipeline has the same relative priority, regardless
of the order of submission or how many other changes are in the
pipeline. This can be used to make node allocations complete
faster for projects with fewer changes in a system dominated by
projects with more changes.
If this value is ``False`` (the default), then node requests are
sorted by pipeline precedence followed by the order in which
they were submitted. If this is ``True``, they are sorted by
pipeline precedence, followed by relative priority, and finally
the order in which they were submitted.
Operation
~~~~~~~~~

View File

@ -0,0 +1,6 @@
---
features:
- |
A new scheduler option, :attr:`scheduler.relative_priority`, can
be used to instruct Nodepool to fulfull requests from less-busy
projects more quickly.

View File

@ -1734,6 +1734,7 @@ class FakeNodepool(object):
log = logging.getLogger("zuul.test.FakeNodepool")
def __init__(self, host, port, chroot):
self.complete_event = threading.Event()
self.host_keys = None
self.client = kazoo.client.KazooClient(
hosts='%s:%s%s' % (host, port, chroot))
@ -1752,12 +1753,21 @@ class FakeNodepool(object):
self.client.stop()
self.client.close()
def pause(self):
self.complete_event.wait()
self.paused = True
def unpause(self):
self.paused = False
def run(self):
while self._running:
self.complete_event.clear()
try:
self._run()
except Exception:
self.log.exception("Error in fake nodepool:")
self.complete_event.set()
time.sleep(0.1)
def _run(self):
@ -1772,7 +1782,7 @@ class FakeNodepool(object):
except kazoo.exceptions.NoNodeError:
return []
reqs = []
for oid in sorted(reqids):
for oid in reqids:
path = self.REQUEST_ROOT + '/' + oid
try:
data, stat = self.client.get(path)
@ -1781,6 +1791,9 @@ class FakeNodepool(object):
reqs.append(data)
except kazoo.exceptions.NoNodeError:
pass
reqs.sort(key=lambda r: (r['_oid'].split('-')[0],
r['relative_priority'],
r['_oid'].split('-')[1]))
return reqs
def getNodes(self):

View File

@ -0,0 +1,79 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
failure:
gerrit:
Verified: -2
start:
gerrit:
Verified: 0
precedence: high
- job:
name: base
parent: null
run: playbooks/base.yaml
nodeset:
nodes:
- name: controller
label: ubuntu-xenial
- job:
name: test
run: playbooks/test.yaml
- job:
name: integration
run: playbooks/integration.yaml
- project:
name: org/project
check:
jobs:
- test
gate:
jobs:
- test
- project:
name: org/project1
check:
jobs:
- integration
gate:
queue: integrated
jobs:
- integration
- project:
name: org/project2
check:
jobs:
- integration
gate:
queue: integrated
jobs:
- integration

View File

@ -8,6 +8,7 @@ server=127.0.0.1
[scheduler]
tenant_config=main.yaml
relative_priority=true
[merger]
git_dir=/tmp/zuul-test/merger-git

View File

@ -58,7 +58,7 @@ class TestNodepoolIntegration(BaseTestCase):
nodeset.addNode(model.Node(['controller'], 'fake-label'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FULFILLED)
@ -88,7 +88,7 @@ class TestNodepoolIntegration(BaseTestCase):
nodeset.addNode(model.Node(['controller'], 'invalid-label'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, model.STATE_FAILED)
@ -103,7 +103,7 @@ class TestNodepoolIntegration(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.zk.client.stop()
self.zk.client.start()
self.fake_nodepool.paused = False
@ -121,7 +121,7 @@ class TestNodepoolIntegration(BaseTestCase):
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.nodepool.cancelRequest(request)
self.waitForRequests()

View File

@ -71,7 +71,7 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -103,11 +103,11 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job)
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(None, job, 0)
self.zk.client.stop()
self.zk.client.start()
self.fake_nodepool.paused = False
self.fake_nodepool.unpause()
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -120,8 +120,8 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.paused = True
request = self.nodepool.requestNodes(None, job)
self.fake_nodepool.pause()
request = self.nodepool.requestNodes(None, job, 0)
self.nodepool.cancelRequest(request)
self.waitForRequests()
@ -135,7 +135,7 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -156,7 +156,7 @@ class TestNodepool(BaseTestCase):
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
request = self.nodepool.requestNodes(None, job)
request = self.nodepool.requestNodes(None, job, 0)
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 1)
self.assertEqual(request.state, 'fulfilled')
@ -170,3 +170,21 @@ class TestNodepool(BaseTestCase):
for node in nodeset.getNodes():
self.assertIsNone(node.lock)
self.assertEqual(node.state, 'ready')
def test_node_request_priority(self):
# Test that requests are satisfied in priority order
nodeset = model.NodeSet()
nodeset.addNode(model.Node(['controller', 'foo'], 'ubuntu-xenial'))
nodeset.addNode(model.Node(['compute'], 'ubuntu-xenial'))
job = model.Job('testjob')
job.nodeset = nodeset
self.fake_nodepool.pause()
request1 = self.nodepool.requestNodes(None, job, 1)
request2 = self.nodepool.requestNodes(None, job, 0)
self.fake_nodepool.unpause()
self.waitForRequests()
self.assertEqual(len(self.provisioned_requests), 2)
self.assertEqual(request1.state, 'fulfilled')
self.assertEqual(request2.state, 'fulfilled')
self.assertTrue(request2.state_time < request1.state_time)

View File

@ -4972,7 +4972,7 @@ For CI problems and help debugging, contact ci@example.org"""
def test_zookeeper_disconnect(self):
"Test that jobs are executed after a zookeeper disconnect"
self.fake_nodepool.paused = True
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@ -4980,7 +4980,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.zk.client.stop()
self.zk.client.start()
self.fake_nodepool.paused = False
self.fake_nodepool.unpause()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')
@ -4991,7 +4991,7 @@ For CI problems and help debugging, contact ci@example.org"""
# This tests receiving a ZK disconnect between the arrival of
# a fulfilled request and when we accept its nodes.
self.fake_nodepool.paused = True
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@ -5003,7 +5003,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.sched.run_handler_lock.acquire()
# Fulfill the nodepool request.
self.fake_nodepool.paused = False
self.fake_nodepool.unpause()
requests = list(self.sched.nodepool.requests.values())
self.assertEqual(1, len(requests))
request = requests[0]
@ -5037,7 +5037,7 @@ For CI problems and help debugging, contact ci@example.org"""
def test_nodepool_failure(self):
"Test that jobs are reported after a nodepool failure"
self.fake_nodepool.paused = True
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@ -5046,7 +5046,7 @@ For CI problems and help debugging, contact ci@example.org"""
req = self.fake_nodepool.getNodeRequests()[0]
self.fake_nodepool.addFailRequest(req)
self.fake_nodepool.paused = False
self.fake_nodepool.unpause()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'NEW')
@ -5055,10 +5055,10 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertIn('project-test1 : SKIPPED', A.messages[1])
self.assertIn('project-test2 : SKIPPED', A.messages[1])
def test_nodepool_priority(self):
"Test that nodes are requested at the correct priority"
def test_nodepool_pipeline_priority(self):
"Test that nodes are requested at the correct pipeline priority"
self.fake_nodepool.paused = True
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
@ -5075,10 +5075,11 @@ For CI problems and help debugging, contact ci@example.org"""
reqs = self.fake_nodepool.getNodeRequests()
# The requests come back sorted by oid. Since we have three requests
# for the three changes each with a different priority.
# Also they get a serial number based on order they were received
# so the number on the endof the oid should map to order submitted.
# The requests come back sorted by priority. Since we have
# three requests for the three changes each with a different
# priority. Also they get a serial number based on order they
# were received so the number on the endof the oid should map
# to order submitted.
# * gate first - high priority - change C
self.assertEqual(reqs[0]['_oid'], '100-0000000002')
@ -5092,13 +5093,93 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(reqs[2]['_oid'], '300-0000000000')
self.assertEqual(reqs[2]['node_types'], ['ubuntu-xenial'])
self.fake_nodepool.paused = False
self.fake_nodepool.unpause()
self.waitUntilSettled()
def test_nodepool_relative_priority_check(self):
"Test that nodes are requested at the relative priority"
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C')
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
reqs = self.fake_nodepool.getNodeRequests()
# The requests come back sorted by priority.
# Change A, first change for project, high relative priority.
self.assertEqual(reqs[0]['_oid'], '200-0000000000')
self.assertEqual(reqs[0]['relative_priority'], 0)
# Change C, first change for project1, high relative priority.
self.assertEqual(reqs[1]['_oid'], '200-0000000002')
self.assertEqual(reqs[1]['relative_priority'], 0)
# Change B, second change for project, lower relative priority.
self.assertEqual(reqs[2]['_oid'], '200-0000000001')
self.assertEqual(reqs[2]['relative_priority'], 1)
self.fake_nodepool.unpause()
self.waitUntilSettled()
@simple_layout('layouts/two-projects-integrated.yaml')
def test_nodepool_relative_priority_gate(self):
"Test that nodes are requested at the relative priority"
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
B.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.waitUntilSettled()
# project does not share a queue with project1 and project2.
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
C.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(C.addApproval('Approved', 1))
self.waitUntilSettled()
reqs = self.fake_nodepool.getNodeRequests()
# The requests come back sorted by priority.
# Change A, first change for shared queue, high relative
# priority.
self.assertEqual(reqs[0]['_oid'], '100-0000000000')
self.assertEqual(reqs[0]['relative_priority'], 0)
# Change C, first change for independent project, high
# relative priority.
self.assertEqual(reqs[1]['_oid'], '100-0000000002')
self.assertEqual(reqs[1]['relative_priority'], 0)
# Change B, second change for shared queue, lower relative
# priority.
self.assertEqual(reqs[2]['_oid'], '100-0000000001')
self.assertEqual(reqs[2]['relative_priority'], 1)
self.fake_nodepool.unpause()
self.waitUntilSettled()
def test_nodepool_job_removal(self):
"Test that nodes are returned unused after job removal"
self.fake_nodepool.paused = True
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@ -5108,7 +5189,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.sched.reconfigure(self.config)
self.waitUntilSettled()
self.fake_nodepool.paused = False
self.fake_nodepool.unpause()
self.waitUntilSettled()
self.assertEqual(A.data['status'], 'MERGED')

View File

@ -85,6 +85,11 @@ class PipelineManager(object):
return True
return False
def getNodePriority(self, item):
items = self.pipeline.getAllItems()
items = [i for i in items if i.change.project == item.change.project]
return items.index(item)
def isChangeAlreadyInPipeline(self, change):
# Checks live items in the pipeline
for item in self.pipeline.getAllItems():
@ -327,8 +332,12 @@ class PipelineManager(object):
return False
build_set = item.current_build_set
self.log.debug("Requesting nodes for change %s" % item.change)
if self.sched.use_relative_priority:
priority = item.getNodePriority()
else:
priority = 0
for job in jobs:
req = self.sched.nodepool.requestNodes(build_set, job)
req = self.sched.nodepool.requestNodes(build_set, job, priority)
self.log.debug("Adding node request %s for job %s to item %s" %
(req, job, item))
build_set.setJobNodeRequest(job.name, req)
@ -687,6 +696,12 @@ class PipelineManager(object):
if failing_reasons:
self.log.debug("%s is a failing item because %s" %
(item, failing_reasons))
if not dequeued and self.sched.use_relative_priority:
priority = item.getNodePriority()
for node_request in item.current_build_set.node_requests.values():
if node_request.relative_priority != priority:
self.sched.nodepool.reviseNodeRequest(
node_request, priority)
return (changed, nnfi)
def processQueue(self):

View File

@ -93,6 +93,11 @@ class DependentPipelineManager(PipelineManager):
self.log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def getNodePriority(self, item):
with self.getChangeQueue(item.change) as change_queue:
items = change_queue.queue
return items.index(item)
def isChangeReadyToBeEnqueued(self, change):
source = change.project.source
if not source.canMerge(change, self.getSubmitAllowNeeds()):

View File

@ -688,7 +688,7 @@ class NodeSet(ConfigObject):
class NodeRequest(object):
"""A request for a set of nodes."""
def __init__(self, requestor, build_set, job, nodeset):
def __init__(self, requestor, build_set, job, nodeset, relative_priority):
self.requestor = requestor
self.build_set = build_set
self.job = job
@ -696,9 +696,12 @@ class NodeRequest(object):
self._state = STATE_REQUESTED
self.requested_time = time.time()
self.state_time = time.time()
self.created_time = None
self.stat = None
self.uid = uuid4().hex
self.relative_priority = relative_priority
self.id = None
self._zk_data = {} # Data that we read back from ZK
# Zuul internal flags (not stored in ZK so they are not
# overwritten).
self.failed = False
@ -731,17 +734,24 @@ class NodeRequest(object):
return '<NodeRequest %s %s>' % (self.id, self.nodeset)
def toDict(self):
d = {}
# Start with any previously read data
d = self._zk_data.copy()
nodes = [n.label for n in self.nodeset.getNodes()]
d['node_types'] = nodes
d['requestor'] = self.requestor
# These are immutable once set
d.setdefault('node_types', nodes)
d.setdefault('requestor', self.requestor)
d.setdefault('created_time', self.created_time)
# We might change these
d['state'] = self.state
d['state_time'] = self.state_time
d['relative_priority'] = self.relative_priority
return d
def updateFromDict(self, data):
self._zk_data = data
self._state = data['state']
self.state_time = data['state_time']
self.relative_priority = data['relative_priority']
class Secret(ConfigObject):
@ -2245,6 +2255,9 @@ class QueueItem(object):
fakebuild.result = 'SKIPPED'
self.addBuild(fakebuild)
def getNodePriority(self):
return self.pipeline.manager.getNodePriority(self)
def formatUrlPattern(self, url_pattern, job=None, build=None):
url = None
# Produce safe versions of objects which may be useful in

View File

@ -13,6 +13,7 @@
import logging
from zuul import model
from zuul.zk import LockException
class Nodepool(object):
@ -51,11 +52,12 @@ class Nodepool(object):
statsd.timing(key + '.size.%s' % len(request.nodeset.nodes), dt)
statsd.gauge('zuul.nodepool.current_requests', len(self.requests))
def requestNodes(self, build_set, job):
def requestNodes(self, build_set, job, relative_priority):
# Create a copy of the nodeset to represent the actual nodes
# returned by nodepool.
nodeset = job.nodeset.copy()
req = model.NodeRequest(self.sched.hostname, build_set, job, nodeset)
req = model.NodeRequest(self.sched.hostname, build_set, job,
nodeset, relative_priority)
self.requests[req.uid] = req
if nodeset.nodes:
@ -79,6 +81,38 @@ class Nodepool(object):
except Exception:
self.log.exception("Error deleting node request:")
def reviseRequest(self, request, relative_priority=None):
'''Attempt to update the node request, if it is not currently being
processed.
:param: NodeRequest request: The request to update.
:param relative_priority int: If supplied, the new relative
priority to set on the request.
'''
if relative_priority is None:
return
try:
self.sched.zk.lockNodeRequest(request, blocking=False)
except LockException:
# It may be locked by nodepool, which is fine.
self.log.debug("Unable to revise locked node request %s", request)
return False
try:
old_priority = request.relative_priority
request.relative_priority = relative_priority
self.sched.zk.storeNodeRequest(request)
self.log.debug("Revised relative priority of "
"node request %s from %s to %s",
request, old_priority, relative_priority)
except Exception:
self.log.exception("Unable to update node request %s", request)
finally:
try:
self.sched.zk.unlockNodeRequest(request)
except Exception:
self.log.exception("Unable to unlock node request %s", request)
def holdNodeSet(self, nodeset, autohold_key):
'''
Perform a hold on the given set of nodes.

View File

@ -305,6 +305,10 @@ class Scheduler(threading.Thread):
self.last_reconfigured = None
self.tenant_last_reconfigured = {}
self.autohold_requests = {}
self.use_relative_priority = False
if self.config.has_option('scheduler', 'relative_priority'):
if self.config.getboolean('scheduler', 'relative_priority'):
self.use_relative_priority = True
def start(self):
super(Scheduler, self).start()

View File

@ -41,6 +41,7 @@ class ZooKeeper(object):
log = logging.getLogger("zuul.zk.ZooKeeper")
REQUEST_ROOT = '/nodepool/requests'
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
NODE_ROOT = '/nodepool/nodes'
# Log zookeeper retry every 10 seconds
@ -162,8 +163,8 @@ class ZooKeeper(object):
from ZooKeeper). The watcher should return False when
further updates are no longer necessary.
'''
node_request.created_time = time.time()
data = node_request.toDict()
data['created_time'] = time.time()
path = '%s/%s-' % (self.REQUEST_ROOT, node_request.priority)
path = self.client.create(path, self._dictToStr(data),
@ -174,15 +175,7 @@ class ZooKeeper(object):
def callback(data, stat):
if data:
data = self._strToDict(data)
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
node_data, node_stat = self.client.get(node_path)
node_data = self._strToDict(node_data)
request_nodes[i].id = nodeid
request_nodes[i].updateFromDict(node_data)
node_request.updateFromDict(data)
self.updateNodeRequest(node_request, data)
deleted = (data is None) # data *are* none
return watcher(node_request, deleted)
@ -215,6 +208,34 @@ class ZooKeeper(object):
return True
return False
def storeNodeRequest(self, node_request):
'''Store the node request.
The request is expected to already exist and is updated in its
entirety.
:param NodeRequest node_request: The request to update.
'''
path = '%s/%s' % (self.NODE_REQUEST_ROOT, node_request.id)
self.client.set(path, self._dictToStr(node_request.toDict()))
def updateNodeRequest(self, node_request, data=None):
'''Refresh an existing node request.
:param NodeRequest node_request: The request to update.
:param dict data: The data to use; query ZK if absent.
'''
if data is None:
path = '%s/%s' % (self.REQUEST_ROOT, node_request.id)
data, stat = self.client.get(path)
data = self._strToDict(data)
request_nodes = list(node_request.nodeset.getNodes())
for i, nodeid in enumerate(data.get('nodes', [])):
request_nodes[i].id = nodeid
self.updateNode(request_nodes[i], nodeid)
node_request.updateFromDict(data)
def storeNode(self, node):
'''Store the node.
@ -227,6 +248,18 @@ class ZooKeeper(object):
path = '%s/%s' % (self.NODE_ROOT, node.id)
self.client.set(path, self._dictToStr(node.toDict()))
def updateNode(self, node, nodeid):
'''Refresh an existing node.
:param Node node: The node to update.
:param Node nodeid: The zookeeper node ID.
'''
node_path = '%s/%s' % (self.NODE_ROOT, nodeid)
node_data, node_stat = self.client.get(node_path)
node_data = self._strToDict(node_data)
node.updateFromDict(node_data)
def lockNode(self, node, blocking=True, timeout=None):
'''
Lock a node.
@ -268,6 +301,59 @@ class ZooKeeper(object):
node.lock.release()
node.lock = None
def lockNodeRequest(self, request, blocking=True, timeout=None):
'''
Lock a node request.
This will set the `lock` attribute of the request object when the
lock is successfully acquired.
:param NodeRequest request: The request to lock.
:param bool blocking: Whether or not to block on trying to
acquire the lock
:param int timeout: When blocking, how long to wait for the lock
to get acquired. None, the default, waits forever.
:raises: TimeoutException if we failed to acquire the lock when
blocking with a timeout. ZKLockException if we are not blocking
and could not get the lock, or a lock is already held.
'''
path = "%s/%s" % (self.REQUEST_LOCK_ROOT, request.id)
try:
lock = Lock(self.client, path)
have_lock = lock.acquire(blocking, timeout)
except kze.LockTimeout:
raise LockException(
"Timeout trying to acquire lock %s" % path)
except kze.NoNodeError:
have_lock = False
self.log.error("Request not found for locking: %s", request)
# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
raise LockException("Did not get lock on %s" % path)
request.lock = lock
self.updateNodeRequest(request)
def unlockNodeRequest(self, request):
'''
Unlock a node request.
The request must already have been locked.
:param NodeRequest request: The request to unlock.
:raises: ZKLockException if the request is not currently locked.
'''
if request.lock is None:
raise LockException(
"Request %s does not hold a lock" % request)
request.lock.release()
request.lock = None
def heldNodeCount(self, autohold_key):
'''
Count the number of nodes being held for the given tenant/project/job.