Merge "Support node caching in the nodeIterator"

This commit is contained in:
Zuul 2018-11-26 18:19:40 +00:00 committed by Gerrit Code Review
commit 3cb6e7f0c9
5 changed files with 104 additions and 18 deletions

View File

@ -900,7 +900,9 @@ class NodePool(threading.Thread):
requested_labels = list(self._submittedRequests.keys())
needed_labels = list(set(label_names) - set(requested_labels))
ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels)
# Note we explicitly don't use the cache here because otherwise we can
# end up creating more min-ready nodes than we want.
ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels, cached=False)
for label in self.config.labels.values():
if label.name not in needed_labels:

View File

@ -110,7 +110,14 @@ class StatsReporter(object):
key = 'nodepool.label.%s.nodes.%s' % (label, state)
states[key] = 0
for node in zk_conn.nodeIterator():
# Note that we intentionally don't use caching here because we don't
# know when the next update will happen and thus need to report the
# correct most recent state. Otherwise we can end up in reporting
# a gauge with a node in state deleting = 1 and never update this for
# a long time.
# TODO(tobiash): Changing updateNodeStats to just run periodically will
# resolve this and we can operate on cached data.
for node in zk_conn.nodeIterator(cached=False):
# nodepool.nodes.STATE
key = 'nodepool.nodes.%s' % node.state
states[key] += 1

View File

@ -24,6 +24,7 @@ import testtools
from nodepool.cmd import nodepoolcmd
from nodepool import tests
from nodepool import zk
from nodepool.nodeutils import iterate_timeout
class TestNodepoolCMD(tests.DBTestCase):
@ -124,8 +125,15 @@ class TestNodepoolCMD(tests.DBTestCase):
pool.start()
self.waitForImage('fake-provider', 'fake-image')
self.waitForNodes('fake-label')
self.assert_nodes_listed(configfile, 1, detail=False,
validate_col_count=True)
for _ in iterate_timeout(10, Exception, "assert nodes are listed"):
try:
self.assert_nodes_listed(configfile, 1, detail=False,
validate_col_count=True)
break
except AssertionError:
# node is not listed yet, retry later
pass
def test_list_nodes_detail(self):
configfile = self.setup_config('node.yaml')
@ -134,8 +142,14 @@ class TestNodepoolCMD(tests.DBTestCase):
pool.start()
self.waitForImage('fake-provider', 'fake-image')
self.waitForNodes('fake-label')
self.assert_nodes_listed(configfile, 1, detail=True,
validate_col_count=True)
for _ in iterate_timeout(10, Exception, "assert nodes are listed"):
try:
self.assert_nodes_listed(configfile, 1, detail=True,
validate_col_count=True)
break
except AssertionError:
# node is not listed yet, retry later
pass
def test_config_validate(self):
config = os.path.join(os.path.dirname(tests.__file__),

View File

@ -17,6 +17,7 @@ import time
from nodepool import exceptions as npe
from nodepool import tests
from nodepool import zk
from nodepool.nodeutils import iterate_timeout
class TestZooKeeper(tests.DBTestCase):
@ -583,7 +584,7 @@ class TestZooKeeper(tests.DBTestCase):
n3.type = 'label2'
self.zk.storeNode(n3)
r = self.zk.getReadyNodesOfTypes(['label1'])
r = self.zk.getReadyNodesOfTypes(['label1'], cached=False)
self.assertIn('label1', r)
self.assertEqual(2, len(r['label1']))
self.assertIn(n1, r['label1'])
@ -603,7 +604,7 @@ class TestZooKeeper(tests.DBTestCase):
n3.type = 'label2'
self.zk.storeNode(n3)
r = self.zk.getReadyNodesOfTypes(['label1', 'label3'])
r = self.zk.getReadyNodesOfTypes(['label1', 'label3'], cached=False)
self.assertIn('label1', r)
self.assertIn('label3', r)
self.assertEqual(2, len(r['label1']))
@ -614,7 +615,7 @@ class TestZooKeeper(tests.DBTestCase):
def test_nodeIterator(self):
n1 = self._create_node()
i = self.zk.nodeIterator()
i = self.zk.nodeIterator(cached=False)
self.assertEqual(n1, next(i))
with testtools.ExpectedException(StopIteration):
next(i)
@ -670,6 +671,40 @@ class TestZooKeeper(tests.DBTestCase):
self.zk.deleteNodeRequestLock(lock_ids[0])
self.assertEqual([], self.zk.getNodeRequestLockIDs())
def test_node_caching(self):
'''
Test that node iteration using both cached and uncached calls
produces identical results.
'''
# Test new node in node set
n1 = self._create_node()
# uncached
a1 = self.zk.nodeIterator(cached=False)
self.assertEqual(n1, next(a1))
# cached
a2 = self.zk.nodeIterator(cached=True)
self.assertEqual(n1, next(a2))
with testtools.ExpectedException(StopIteration):
next(a2)
# Test modification of existing node set
n1.state = zk.HOLD
n1.label = "oompaloompa"
self.zk.storeNode(n1)
# uncached
b1 = self.zk.nodeIterator(cached=False)
self.assertEqual(n1, next(b1))
# cached
for _ in iterate_timeout(10, Exception,
"cached node equals original node"):
b2 = self.zk.nodeIterator(cached=True)
if n1 == next(b2):
break
class TestZKModel(tests.BaseTestCase):

View File

@ -22,6 +22,7 @@ from kazoo.client import KazooClient, KazooState
from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache
from nodepool import exceptions as npe
@ -685,6 +686,7 @@ class ZooKeeper(object):
self.client = None
self._became_lost = False
self._last_retry_log = 0
self._node_cache = None
# =======================================================================
# Private Methods
@ -875,6 +877,9 @@ class ZooKeeper(object):
except KazooTimeoutError:
self.logConnectionRetryEvent()
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
self._node_cache.start()
def disconnect(self):
'''
Close the ZooKeeper cluster connection.
@ -882,6 +887,11 @@ class ZooKeeper(object):
You should call this method if you used connect() to establish a
cluster connection.
'''
if self._node_cache is not None:
self._node_cache.close()
self._node_cache = None
if self.client is not None and self.client.connected:
self.client.stop()
self.client.close()
@ -1696,19 +1706,35 @@ class ZooKeeper(object):
except kze.NoNodeError:
return []
def getNode(self, node):
def getNode(self, node, cached=False):
'''
Get the data for a specific node.
:param str node: The node ID.
:param bool cached: True if the data should be taken from the cache.
:returns: The node data, or None if the node was not found.
'''
path = self._nodePath(node)
try:
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
data = None
stat = None
if cached:
cached_data = self._node_cache.get_data(path)
if cached_data:
data = cached_data.data
stat = cached_data.stat
# If data is empty we either didn't use the cache or the cache didn't
# have the node (yet). Note that even if we use caching we need to
# do a real query if the cached data is empty because the node data
# might not be in the cache yet when it's listed by the get_children
# call.
if not data:
try:
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
if not data:
return None
@ -1763,7 +1789,7 @@ class ZooKeeper(object):
except kze.NoNodeError:
pass
def getReadyNodesOfTypes(self, labels):
def getReadyNodesOfTypes(self, labels, cached=True):
'''
Query ZooKeeper for unused/ready nodes.
@ -1775,7 +1801,7 @@ class ZooKeeper(object):
those labels.
'''
ret = {}
for node in self.nodeIterator():
for node in self.nodeIterator(cached=cached):
if node.state != READY or node.allocated_to:
continue
for label in labels:
@ -1852,12 +1878,14 @@ class ZooKeeper(object):
return False
def nodeIterator(self):
def nodeIterator(self, cached=True):
'''
Utility generator method for iterating through all nodes.
:param bool cached: True if the data should be taken from the cache.
'''
for node_id in self.getNodes():
node = self.getNode(node_id)
node = self.getNode(node_id, cached=cached)
if node:
yield node