Update node during lockNode

After locking a node we most of the time update a node to double check
if the state changed on us. This is important to not operate on stale
data as we only have a guarantee that the node data is updated if we
update the data after getting the lock (regardless if we operate on
cached data or not).. However it is cumbersome to do this after every
lockNode call so doing this centrally reduces the risk to forget that.

This does not introduce any behavior change.

Change-Id: I06001e487041a3b67d969070b43f491c4ed3dce0
This commit is contained in:
Tobias Henkel 2018-11-08 09:09:49 +01:00
parent 3cb6e7f0c9
commit b6a3e319e7
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
4 changed files with 87 additions and 61 deletions

View File

@ -173,15 +173,14 @@ class StaticNodeProvider(Provider):
if original_attrs == new_attrs:
continue
node.type = static_node["labels"]
node.username = static_node["username"]
node.connection_port = static_node["connection-port"]
node.connection_type = static_node["connection-type"]
nodeutils.set_node_ip(node)
node.host_keys = host_keys
try:
self.zk.lockNode(node, blocking=False)
node.type = static_node["labels"]
node.username = static_node["username"]
node.connection_port = static_node["connection-port"]
node.connection_type = static_node["connection-type"]
nodeutils.set_node_ip(node)
node.host_keys = host_keys
except exceptions.ZKLockException:
self.log.warning("Unable to lock node %s for update", node.id)
continue

View File

@ -510,10 +510,8 @@ class CleanupWorker(BaseCleanupWorker):
continue
# Double check the state now that we have a lock since it
# may have changed on us. We keep using the original node
# since it's holding the lock.
_node = zk_conn.getNode(node.id)
if _node.state != zk.READY:
# may have changed on us.
if node.state != zk.READY:
zk_conn.unlockNode(node)
continue
@ -564,10 +562,8 @@ class CleanupWorker(BaseCleanupWorker):
continue
# Double check the state now that we have a lock since it
# may have changed on us. We keep using the original node
# since it's holding the lock.
_node = zk_conn.getNode(node.id)
if _node.state != zk.HOLD:
# may have changed on us.
if node.state != zk.HOLD:
zk_conn.unlockNode(node)
continue
@ -677,10 +673,8 @@ class DeletedNodeWorker(BaseCleanupWorker):
continue
# Double check the state now that we have a lock since it
# may have changed on us. We keep using the original node
# since it's holding the lock.
_node = zk_conn.getNode(node.id)
if _node.state not in cleanup_states:
# may have changed on us.
if node.state not in cleanup_states:
zk_conn.unlockNode(node)
continue

View File

@ -860,9 +860,9 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0]
self.log.debug("Holding node %s..." % node.id)
# hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD
node.comment = 'testing'
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node)
self.zk.unlockNode(node)
znode = self.zk.getNode(node.id)
@ -885,10 +885,10 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0]
self.log.debug("Holding node %s..." % node.id)
# hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD
node.comment = 'testing'
node.hold_expiration = 1
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node)
self.zk.unlockNode(node)
znode = self.zk.getNode(node.id)
@ -911,10 +911,10 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0]
self.log.debug("Holding node %s..." % node.id)
# hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD
node.comment = 'testing'
node.hold_expiration = '1'
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node)
self.zk.unlockNode(node)
znode = self.zk.getNode(node.id)
@ -936,10 +936,10 @@ class TestLauncher(tests.DBTestCase):
node = nodes[0]
self.log.debug("Holding node %s..." % node.id)
# hold the node
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD
node.comment = 'testing'
node.hold_expiration = 'notanumber'
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node)
self.zk.unlockNode(node)
znode = self.zk.getNode(node.id)
@ -965,15 +965,15 @@ class TestLauncher(tests.DBTestCase):
self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id,
hold_expiration))
# hold the nodes
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD
node.comment = 'testing'
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node)
self.zk.unlockNode(node)
self.zk.lockNode(node_custom, blocking=False)
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.storeNode(node_custom)
self.zk.unlockNode(node_custom)
znode = self.zk.getNode(node.id)
@ -1010,15 +1010,15 @@ class TestLauncher(tests.DBTestCase):
self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id,
hold_expiration))
# hold the nodes
self.zk.lockNode(node, blocking=False)
node.state = zk.HOLD
node.comment = 'testing'
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.lockNode(node, blocking=False)
self.zk.storeNode(node)
self.zk.unlockNode(node)
self.zk.lockNode(node_custom, blocking=False)
node_custom.state = zk.HOLD
node_custom.comment = 'testing hold_expiration'
node_custom.hold_expiration = hold_expiration
self.zk.storeNode(node_custom)
self.zk.unlockNode(node_custom)
znode = self.zk.getNode(node.id)

View File

@ -613,43 +613,53 @@ class Node(BaseModel):
'''
o = Node(o_id)
super(Node, o).fromDict(d)
o.cloud = d.get('cloud')
o.provider = d.get('provider')
o.pool = d.get('pool')
o.type = d.get('type')
o.allocated_to = d.get('allocated_to')
o.az = d.get('az')
o.region = d.get('region')
o.public_ipv4 = d.get('public_ipv4')
o.private_ipv4 = d.get('private_ipv4')
o.public_ipv6 = d.get('public_ipv6')
o.interface_ip = d.get('interface_ip')
o.connection_port = d.get('connection_port', d.get('ssh_port', 22))
o.image_id = d.get('image_id')
o.launcher = d.get('launcher')
o.created_time = d.get('created_time')
o.external_id = d.get('external_id')
o.hostname = d.get('hostname')
o.comment = d.get('comment')
o.hold_job = d.get('hold_job')
o.username = d.get('username', 'zuul')
o.connection_type = d.get('connection_type')
o.host_keys = d.get('host_keys', [])
o.updateFromDict(d)
return o
def updateFromDict(self, d):
'''
Updates the Node object from a dictionary
:param dict d: The dictionary
'''
super().fromDict(d)
self.cloud = d.get('cloud')
self.provider = d.get('provider')
self.pool = d.get('pool')
self.type = d.get('type')
self.allocated_to = d.get('allocated_to')
self.az = d.get('az')
self.region = d.get('region')
self.public_ipv4 = d.get('public_ipv4')
self.private_ipv4 = d.get('private_ipv4')
self.public_ipv6 = d.get('public_ipv6')
self.interface_ip = d.get('interface_ip')
self.connection_port = d.get('connection_port', d.get('ssh_port', 22))
self.image_id = d.get('image_id')
self.launcher = d.get('launcher')
self.created_time = d.get('created_time')
self.external_id = d.get('external_id')
self.hostname = d.get('hostname')
self.comment = d.get('comment')
self.hold_job = d.get('hold_job')
self.username = d.get('username', 'zuul')
self.connection_type = d.get('connection_type')
self.host_keys = d.get('host_keys', [])
hold_expiration = d.get('hold_expiration')
if hold_expiration is not None:
try:
# We try to force this to an integer value because we do
# relative second based age comparisons using this value
# and those need to be a number type.
o.hold_expiration = int(hold_expiration)
self.hold_expiration = int(hold_expiration)
except ValueError:
# Coercion to int failed, just use default of 0,
# which means no expiration
o.hold_expiration = 0
self.hold_expiration = 0
else:
o.hold_expiration = hold_expiration
o.resources = d.get('resources')
return o
self.hold_expiration = hold_expiration
self.resources = d.get('resources')
class ZooKeeper(object):
@ -1650,7 +1660,9 @@ class ZooKeeper(object):
Lock a node.
This will set the `lock` attribute of the Node object when the
lock is successfully acquired.
lock is successfully acquired. Also this will update the node with the
latest data after acquiring the lock in order to guarantee that it has
the latest state if locking was successful.
:param Node node: The node to lock.
:param bool blocking: Whether or not to block on trying to
@ -1680,6 +1692,9 @@ class ZooKeeper(object):
node.lock = lock
# Do an in-place update of the node so we have the latest data.
self.updateNode(node)
def unlockNode(self, node):
'''
Unlock a node.
@ -1743,6 +1758,25 @@ class ZooKeeper(object):
d.stat = stat
return d
def updateNode(self, node):
'''
Update the data of a node object in-place
:param node: The node object
'''
path = self._nodePath(node.id)
data, stat = self.client.get(path)
if data:
d = self._bytesToDict(data)
else:
# The node exists but has no data so use empty dict.
d = {}
node.updateFromDict(d)
node.stat = stat
def storeNode(self, node):
'''
Store an new or existing node.
@ -1855,8 +1889,7 @@ class ZooKeeper(object):
continue
# Make sure the state didn't change on us
n = self.getNode(node.id)
if n.state != READY:
if node.state != READY:
self.unlockNode(node)
continue