Merge "Support node caching in the nodeIterator"
This commit is contained in:
commit
3cb6e7f0c9
|
@ -900,7 +900,9 @@ class NodePool(threading.Thread):
|
|||
requested_labels = list(self._submittedRequests.keys())
|
||||
needed_labels = list(set(label_names) - set(requested_labels))
|
||||
|
||||
ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels)
|
||||
# Note we explicitly don't use the cache here because otherwise we can
|
||||
# end up creating more min-ready nodes than we want.
|
||||
ready_nodes = self.zk.getReadyNodesOfTypes(needed_labels, cached=False)
|
||||
|
||||
for label in self.config.labels.values():
|
||||
if label.name not in needed_labels:
|
||||
|
|
|
@ -110,7 +110,14 @@ class StatsReporter(object):
|
|||
key = 'nodepool.label.%s.nodes.%s' % (label, state)
|
||||
states[key] = 0
|
||||
|
||||
for node in zk_conn.nodeIterator():
|
||||
# Note that we intentionally don't use caching here because we don't
|
||||
# know when the next update will happen and thus need to report the
|
||||
# correct most recent state. Otherwise we can end up in reporting
|
||||
# a gauge with a node in state deleting = 1 and never update this for
|
||||
# a long time.
|
||||
# TODO(tobiash): Changing updateNodeStats to just run periodically will
|
||||
# resolve this and we can operate on cached data.
|
||||
for node in zk_conn.nodeIterator(cached=False):
|
||||
# nodepool.nodes.STATE
|
||||
key = 'nodepool.nodes.%s' % node.state
|
||||
states[key] += 1
|
||||
|
|
|
@ -24,6 +24,7 @@ import testtools
|
|||
from nodepool.cmd import nodepoolcmd
|
||||
from nodepool import tests
|
||||
from nodepool import zk
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
|
||||
|
||||
class TestNodepoolCMD(tests.DBTestCase):
|
||||
|
@ -124,8 +125,15 @@ class TestNodepoolCMD(tests.DBTestCase):
|
|||
pool.start()
|
||||
self.waitForImage('fake-provider', 'fake-image')
|
||||
self.waitForNodes('fake-label')
|
||||
self.assert_nodes_listed(configfile, 1, detail=False,
|
||||
validate_col_count=True)
|
||||
|
||||
for _ in iterate_timeout(10, Exception, "assert nodes are listed"):
|
||||
try:
|
||||
self.assert_nodes_listed(configfile, 1, detail=False,
|
||||
validate_col_count=True)
|
||||
break
|
||||
except AssertionError:
|
||||
# node is not listed yet, retry later
|
||||
pass
|
||||
|
||||
def test_list_nodes_detail(self):
|
||||
configfile = self.setup_config('node.yaml')
|
||||
|
@ -134,8 +142,14 @@ class TestNodepoolCMD(tests.DBTestCase):
|
|||
pool.start()
|
||||
self.waitForImage('fake-provider', 'fake-image')
|
||||
self.waitForNodes('fake-label')
|
||||
self.assert_nodes_listed(configfile, 1, detail=True,
|
||||
validate_col_count=True)
|
||||
for _ in iterate_timeout(10, Exception, "assert nodes are listed"):
|
||||
try:
|
||||
self.assert_nodes_listed(configfile, 1, detail=True,
|
||||
validate_col_count=True)
|
||||
break
|
||||
except AssertionError:
|
||||
# node is not listed yet, retry later
|
||||
pass
|
||||
|
||||
def test_config_validate(self):
|
||||
config = os.path.join(os.path.dirname(tests.__file__),
|
||||
|
|
|
@ -17,6 +17,7 @@ import time
|
|||
from nodepool import exceptions as npe
|
||||
from nodepool import tests
|
||||
from nodepool import zk
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
|
||||
|
||||
class TestZooKeeper(tests.DBTestCase):
|
||||
|
@ -583,7 +584,7 @@ class TestZooKeeper(tests.DBTestCase):
|
|||
n3.type = 'label2'
|
||||
self.zk.storeNode(n3)
|
||||
|
||||
r = self.zk.getReadyNodesOfTypes(['label1'])
|
||||
r = self.zk.getReadyNodesOfTypes(['label1'], cached=False)
|
||||
self.assertIn('label1', r)
|
||||
self.assertEqual(2, len(r['label1']))
|
||||
self.assertIn(n1, r['label1'])
|
||||
|
@ -603,7 +604,7 @@ class TestZooKeeper(tests.DBTestCase):
|
|||
n3.type = 'label2'
|
||||
self.zk.storeNode(n3)
|
||||
|
||||
r = self.zk.getReadyNodesOfTypes(['label1', 'label3'])
|
||||
r = self.zk.getReadyNodesOfTypes(['label1', 'label3'], cached=False)
|
||||
self.assertIn('label1', r)
|
||||
self.assertIn('label3', r)
|
||||
self.assertEqual(2, len(r['label1']))
|
||||
|
@ -614,7 +615,7 @@ class TestZooKeeper(tests.DBTestCase):
|
|||
|
||||
def test_nodeIterator(self):
|
||||
n1 = self._create_node()
|
||||
i = self.zk.nodeIterator()
|
||||
i = self.zk.nodeIterator(cached=False)
|
||||
self.assertEqual(n1, next(i))
|
||||
with testtools.ExpectedException(StopIteration):
|
||||
next(i)
|
||||
|
@ -670,6 +671,40 @@ class TestZooKeeper(tests.DBTestCase):
|
|||
self.zk.deleteNodeRequestLock(lock_ids[0])
|
||||
self.assertEqual([], self.zk.getNodeRequestLockIDs())
|
||||
|
||||
def test_node_caching(self):
|
||||
'''
|
||||
Test that node iteration using both cached and uncached calls
|
||||
produces identical results.
|
||||
'''
|
||||
# Test new node in node set
|
||||
n1 = self._create_node()
|
||||
|
||||
# uncached
|
||||
a1 = self.zk.nodeIterator(cached=False)
|
||||
self.assertEqual(n1, next(a1))
|
||||
|
||||
# cached
|
||||
a2 = self.zk.nodeIterator(cached=True)
|
||||
self.assertEqual(n1, next(a2))
|
||||
with testtools.ExpectedException(StopIteration):
|
||||
next(a2)
|
||||
|
||||
# Test modification of existing node set
|
||||
n1.state = zk.HOLD
|
||||
n1.label = "oompaloompa"
|
||||
self.zk.storeNode(n1)
|
||||
|
||||
# uncached
|
||||
b1 = self.zk.nodeIterator(cached=False)
|
||||
self.assertEqual(n1, next(b1))
|
||||
|
||||
# cached
|
||||
for _ in iterate_timeout(10, Exception,
|
||||
"cached node equals original node"):
|
||||
b2 = self.zk.nodeIterator(cached=True)
|
||||
if n1 == next(b2):
|
||||
break
|
||||
|
||||
|
||||
class TestZKModel(tests.BaseTestCase):
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ from kazoo.client import KazooClient, KazooState
|
|||
from kazoo import exceptions as kze
|
||||
from kazoo.handlers.threading import KazooTimeoutError
|
||||
from kazoo.recipe.lock import Lock
|
||||
from kazoo.recipe.cache import TreeCache
|
||||
|
||||
from nodepool import exceptions as npe
|
||||
|
||||
|
@ -685,6 +686,7 @@ class ZooKeeper(object):
|
|||
self.client = None
|
||||
self._became_lost = False
|
||||
self._last_retry_log = 0
|
||||
self._node_cache = None
|
||||
|
||||
# =======================================================================
|
||||
# Private Methods
|
||||
|
@ -875,6 +877,9 @@ class ZooKeeper(object):
|
|||
except KazooTimeoutError:
|
||||
self.logConnectionRetryEvent()
|
||||
|
||||
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
|
||||
self._node_cache.start()
|
||||
|
||||
def disconnect(self):
|
||||
'''
|
||||
Close the ZooKeeper cluster connection.
|
||||
|
@ -882,6 +887,11 @@ class ZooKeeper(object):
|
|||
You should call this method if you used connect() to establish a
|
||||
cluster connection.
|
||||
'''
|
||||
|
||||
if self._node_cache is not None:
|
||||
self._node_cache.close()
|
||||
self._node_cache = None
|
||||
|
||||
if self.client is not None and self.client.connected:
|
||||
self.client.stop()
|
||||
self.client.close()
|
||||
|
@ -1696,19 +1706,35 @@ class ZooKeeper(object):
|
|||
except kze.NoNodeError:
|
||||
return []
|
||||
|
||||
def getNode(self, node):
|
||||
def getNode(self, node, cached=False):
|
||||
'''
|
||||
Get the data for a specific node.
|
||||
|
||||
:param str node: The node ID.
|
||||
:param bool cached: True if the data should be taken from the cache.
|
||||
|
||||
:returns: The node data, or None if the node was not found.
|
||||
'''
|
||||
path = self._nodePath(node)
|
||||
try:
|
||||
data, stat = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
return None
|
||||
data = None
|
||||
stat = None
|
||||
if cached:
|
||||
cached_data = self._node_cache.get_data(path)
|
||||
if cached_data:
|
||||
data = cached_data.data
|
||||
stat = cached_data.stat
|
||||
|
||||
# If data is empty we either didn't use the cache or the cache didn't
|
||||
# have the node (yet). Note that even if we use caching we need to
|
||||
# do a real query if the cached data is empty because the node data
|
||||
# might not be in the cache yet when it's listed by the get_children
|
||||
# call.
|
||||
if not data:
|
||||
try:
|
||||
data, stat = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
return None
|
||||
|
||||
if not data:
|
||||
return None
|
||||
|
||||
|
@ -1763,7 +1789,7 @@ class ZooKeeper(object):
|
|||
except kze.NoNodeError:
|
||||
pass
|
||||
|
||||
def getReadyNodesOfTypes(self, labels):
|
||||
def getReadyNodesOfTypes(self, labels, cached=True):
|
||||
'''
|
||||
Query ZooKeeper for unused/ready nodes.
|
||||
|
||||
|
@ -1775,7 +1801,7 @@ class ZooKeeper(object):
|
|||
those labels.
|
||||
'''
|
||||
ret = {}
|
||||
for node in self.nodeIterator():
|
||||
for node in self.nodeIterator(cached=cached):
|
||||
if node.state != READY or node.allocated_to:
|
||||
continue
|
||||
for label in labels:
|
||||
|
@ -1852,12 +1878,14 @@ class ZooKeeper(object):
|
|||
|
||||
return False
|
||||
|
||||
def nodeIterator(self):
|
||||
def nodeIterator(self, cached=True):
|
||||
'''
|
||||
Utility generator method for iterating through all nodes.
|
||||
|
||||
:param bool cached: True if the data should be taken from the cache.
|
||||
'''
|
||||
for node_id in self.getNodes():
|
||||
node = self.getNode(node_id)
|
||||
node = self.getNode(node_id, cached=cached)
|
||||
if node:
|
||||
yield node
|
||||
|
||||
|
|
Loading…
Reference in New Issue