Update node during lockNode
After locking a node we most of the time update a node to double check if the state changed on us. This is important to not operate on stale data as we only have a guarantee that the node data is updated if we update the data after getting the lock (regardless if we operate on cached data or not).. However it is cumbersome to do this after every lockNode call so doing this centrally reduces the risk to forget that. This does not introduce any behavior change. Change-Id: I06001e487041a3b67d969070b43f491c4ed3dce0
This commit is contained in:
parent
3cb6e7f0c9
commit
b6a3e319e7
|
@ -173,15 +173,14 @@ class StaticNodeProvider(Provider):
|
|||
if original_attrs == new_attrs:
|
||||
continue
|
||||
|
||||
node.type = static_node["labels"]
|
||||
node.username = static_node["username"]
|
||||
node.connection_port = static_node["connection-port"]
|
||||
node.connection_type = static_node["connection-type"]
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = host_keys
|
||||
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.type = static_node["labels"]
|
||||
node.username = static_node["username"]
|
||||
node.connection_port = static_node["connection-port"]
|
||||
node.connection_type = static_node["connection-type"]
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = host_keys
|
||||
except exceptions.ZKLockException:
|
||||
self.log.warning("Unable to lock node %s for update", node.id)
|
||||
continue
|
||||
|
|
|
@ -510,10 +510,8 @@ class CleanupWorker(BaseCleanupWorker):
|
|||
continue
|
||||
|
||||
# Double check the state now that we have a lock since it
|
||||
# may have changed on us. We keep using the original node
|
||||
# since it's holding the lock.
|
||||
_node = zk_conn.getNode(node.id)
|
||||
if _node.state != zk.READY:
|
||||
# may have changed on us.
|
||||
if node.state != zk.READY:
|
||||
zk_conn.unlockNode(node)
|
||||
continue
|
||||
|
||||
|
@ -564,10 +562,8 @@ class CleanupWorker(BaseCleanupWorker):
|
|||
continue
|
||||
|
||||
# Double check the state now that we have a lock since it
|
||||
# may have changed on us. We keep using the original node
|
||||
# since it's holding the lock.
|
||||
_node = zk_conn.getNode(node.id)
|
||||
if _node.state != zk.HOLD:
|
||||
# may have changed on us.
|
||||
if node.state != zk.HOLD:
|
||||
zk_conn.unlockNode(node)
|
||||
continue
|
||||
|
||||
|
@ -677,10 +673,8 @@ class DeletedNodeWorker(BaseCleanupWorker):
|
|||
continue
|
||||
|
||||
# Double check the state now that we have a lock since it
|
||||
# may have changed on us. We keep using the original node
|
||||
# since it's holding the lock.
|
||||
_node = zk_conn.getNode(node.id)
|
||||
if _node.state not in cleanup_states:
|
||||
# may have changed on us.
|
||||
if node.state not in cleanup_states:
|
||||
zk_conn.unlockNode(node)
|
||||
continue
|
||||
|
||||
|
|
|
@ -860,9 +860,9 @@ class TestLauncher(tests.DBTestCase):
|
|||
node = nodes[0]
|
||||
self.log.debug("Holding node %s..." % node.id)
|
||||
# hold the node
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.state = zk.HOLD
|
||||
node.comment = 'testing'
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.zk.storeNode(node)
|
||||
self.zk.unlockNode(node)
|
||||
znode = self.zk.getNode(node.id)
|
||||
|
@ -885,10 +885,10 @@ class TestLauncher(tests.DBTestCase):
|
|||
node = nodes[0]
|
||||
self.log.debug("Holding node %s..." % node.id)
|
||||
# hold the node
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.state = zk.HOLD
|
||||
node.comment = 'testing'
|
||||
node.hold_expiration = 1
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.zk.storeNode(node)
|
||||
self.zk.unlockNode(node)
|
||||
znode = self.zk.getNode(node.id)
|
||||
|
@ -911,10 +911,10 @@ class TestLauncher(tests.DBTestCase):
|
|||
node = nodes[0]
|
||||
self.log.debug("Holding node %s..." % node.id)
|
||||
# hold the node
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.state = zk.HOLD
|
||||
node.comment = 'testing'
|
||||
node.hold_expiration = '1'
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.zk.storeNode(node)
|
||||
self.zk.unlockNode(node)
|
||||
znode = self.zk.getNode(node.id)
|
||||
|
@ -936,10 +936,10 @@ class TestLauncher(tests.DBTestCase):
|
|||
node = nodes[0]
|
||||
self.log.debug("Holding node %s..." % node.id)
|
||||
# hold the node
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.state = zk.HOLD
|
||||
node.comment = 'testing'
|
||||
node.hold_expiration = 'notanumber'
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.zk.storeNode(node)
|
||||
self.zk.unlockNode(node)
|
||||
znode = self.zk.getNode(node.id)
|
||||
|
@ -965,15 +965,15 @@ class TestLauncher(tests.DBTestCase):
|
|||
self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id,
|
||||
hold_expiration))
|
||||
# hold the nodes
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.state = zk.HOLD
|
||||
node.comment = 'testing'
|
||||
node_custom.state = zk.HOLD
|
||||
node_custom.comment = 'testing hold_expiration'
|
||||
node_custom.hold_expiration = hold_expiration
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.zk.storeNode(node)
|
||||
self.zk.unlockNode(node)
|
||||
self.zk.lockNode(node_custom, blocking=False)
|
||||
node_custom.state = zk.HOLD
|
||||
node_custom.comment = 'testing hold_expiration'
|
||||
node_custom.hold_expiration = hold_expiration
|
||||
self.zk.storeNode(node_custom)
|
||||
self.zk.unlockNode(node_custom)
|
||||
znode = self.zk.getNode(node.id)
|
||||
|
@ -1010,15 +1010,15 @@ class TestLauncher(tests.DBTestCase):
|
|||
self.log.debug("Holding node %s...(%s seconds)" % (node_custom.id,
|
||||
hold_expiration))
|
||||
# hold the nodes
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
node.state = zk.HOLD
|
||||
node.comment = 'testing'
|
||||
node_custom.state = zk.HOLD
|
||||
node_custom.comment = 'testing hold_expiration'
|
||||
node_custom.hold_expiration = hold_expiration
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.zk.storeNode(node)
|
||||
self.zk.unlockNode(node)
|
||||
self.zk.lockNode(node_custom, blocking=False)
|
||||
node_custom.state = zk.HOLD
|
||||
node_custom.comment = 'testing hold_expiration'
|
||||
node_custom.hold_expiration = hold_expiration
|
||||
self.zk.storeNode(node_custom)
|
||||
self.zk.unlockNode(node_custom)
|
||||
znode = self.zk.getNode(node.id)
|
||||
|
|
|
@ -613,43 +613,53 @@ class Node(BaseModel):
|
|||
'''
|
||||
o = Node(o_id)
|
||||
super(Node, o).fromDict(d)
|
||||
o.cloud = d.get('cloud')
|
||||
o.provider = d.get('provider')
|
||||
o.pool = d.get('pool')
|
||||
o.type = d.get('type')
|
||||
o.allocated_to = d.get('allocated_to')
|
||||
o.az = d.get('az')
|
||||
o.region = d.get('region')
|
||||
o.public_ipv4 = d.get('public_ipv4')
|
||||
o.private_ipv4 = d.get('private_ipv4')
|
||||
o.public_ipv6 = d.get('public_ipv6')
|
||||
o.interface_ip = d.get('interface_ip')
|
||||
o.connection_port = d.get('connection_port', d.get('ssh_port', 22))
|
||||
o.image_id = d.get('image_id')
|
||||
o.launcher = d.get('launcher')
|
||||
o.created_time = d.get('created_time')
|
||||
o.external_id = d.get('external_id')
|
||||
o.hostname = d.get('hostname')
|
||||
o.comment = d.get('comment')
|
||||
o.hold_job = d.get('hold_job')
|
||||
o.username = d.get('username', 'zuul')
|
||||
o.connection_type = d.get('connection_type')
|
||||
o.host_keys = d.get('host_keys', [])
|
||||
|
||||
o.updateFromDict(d)
|
||||
return o
|
||||
|
||||
def updateFromDict(self, d):
|
||||
'''
|
||||
Updates the Node object from a dictionary
|
||||
|
||||
:param dict d: The dictionary
|
||||
'''
|
||||
super().fromDict(d)
|
||||
self.cloud = d.get('cloud')
|
||||
self.provider = d.get('provider')
|
||||
self.pool = d.get('pool')
|
||||
self.type = d.get('type')
|
||||
self.allocated_to = d.get('allocated_to')
|
||||
self.az = d.get('az')
|
||||
self.region = d.get('region')
|
||||
self.public_ipv4 = d.get('public_ipv4')
|
||||
self.private_ipv4 = d.get('private_ipv4')
|
||||
self.public_ipv6 = d.get('public_ipv6')
|
||||
self.interface_ip = d.get('interface_ip')
|
||||
self.connection_port = d.get('connection_port', d.get('ssh_port', 22))
|
||||
self.image_id = d.get('image_id')
|
||||
self.launcher = d.get('launcher')
|
||||
self.created_time = d.get('created_time')
|
||||
self.external_id = d.get('external_id')
|
||||
self.hostname = d.get('hostname')
|
||||
self.comment = d.get('comment')
|
||||
self.hold_job = d.get('hold_job')
|
||||
self.username = d.get('username', 'zuul')
|
||||
self.connection_type = d.get('connection_type')
|
||||
self.host_keys = d.get('host_keys', [])
|
||||
hold_expiration = d.get('hold_expiration')
|
||||
if hold_expiration is not None:
|
||||
try:
|
||||
# We try to force this to an integer value because we do
|
||||
# relative second based age comparisons using this value
|
||||
# and those need to be a number type.
|
||||
o.hold_expiration = int(hold_expiration)
|
||||
self.hold_expiration = int(hold_expiration)
|
||||
except ValueError:
|
||||
# Coercion to int failed, just use default of 0,
|
||||
# which means no expiration
|
||||
o.hold_expiration = 0
|
||||
self.hold_expiration = 0
|
||||
else:
|
||||
o.hold_expiration = hold_expiration
|
||||
o.resources = d.get('resources')
|
||||
return o
|
||||
self.hold_expiration = hold_expiration
|
||||
self.resources = d.get('resources')
|
||||
|
||||
|
||||
class ZooKeeper(object):
|
||||
|
@ -1650,7 +1660,9 @@ class ZooKeeper(object):
|
|||
Lock a node.
|
||||
|
||||
This will set the `lock` attribute of the Node object when the
|
||||
lock is successfully acquired.
|
||||
lock is successfully acquired. Also this will update the node with the
|
||||
latest data after acquiring the lock in order to guarantee that it has
|
||||
the latest state if locking was successful.
|
||||
|
||||
:param Node node: The node to lock.
|
||||
:param bool blocking: Whether or not to block on trying to
|
||||
|
@ -1680,6 +1692,9 @@ class ZooKeeper(object):
|
|||
|
||||
node.lock = lock
|
||||
|
||||
# Do an in-place update of the node so we have the latest data.
|
||||
self.updateNode(node)
|
||||
|
||||
def unlockNode(self, node):
|
||||
'''
|
||||
Unlock a node.
|
||||
|
@ -1743,6 +1758,25 @@ class ZooKeeper(object):
|
|||
d.stat = stat
|
||||
return d
|
||||
|
||||
def updateNode(self, node):
|
||||
'''
|
||||
Update the data of a node object in-place
|
||||
|
||||
:param node: The node object
|
||||
'''
|
||||
|
||||
path = self._nodePath(node.id)
|
||||
data, stat = self.client.get(path)
|
||||
|
||||
if data:
|
||||
d = self._bytesToDict(data)
|
||||
else:
|
||||
# The node exists but has no data so use empty dict.
|
||||
d = {}
|
||||
|
||||
node.updateFromDict(d)
|
||||
node.stat = stat
|
||||
|
||||
def storeNode(self, node):
|
||||
'''
|
||||
Store an new or existing node.
|
||||
|
@ -1855,8 +1889,7 @@ class ZooKeeper(object):
|
|||
continue
|
||||
|
||||
# Make sure the state didn't change on us
|
||||
n = self.getNode(node.id)
|
||||
if n.state != READY:
|
||||
if node.state != READY:
|
||||
self.unlockNode(node)
|
||||
continue
|
||||
|
||||
|
|
Loading…
Reference in New Issue