Merge "Add second level cache of nodes"

This commit is contained in:
Zuul 2018-11-29 09:59:39 +00:00 committed by Gerrit Code Review
commit 1aac1cc8d2
2 changed files with 80 additions and 26 deletions

View File

@ -39,18 +39,25 @@ class TestNodepoolCMD(tests.DBTestCase):
def assert_listed(self, configfile, cmd, col, val, count, col_count=0):
log = logging.getLogger("tests.PrettyTableMock")
self.patch_argv("-c", configfile, *cmd)
with mock.patch('prettytable.PrettyTable.add_row') as m_add_row:
nodepoolcmd.main()
rows_with_val = 0
# Find add_rows with the status were looking for
for args, kwargs in m_add_row.call_args_list:
row = args[0]
if col_count:
self.assertEquals(len(row), col_count)
log.debug(row)
if row[col] == val:
rows_with_val += 1
self.assertEquals(rows_with_val, count)
for _ in iterate_timeout(10, AssertionError, 'assert listed'):
try:
with mock.patch('prettytable.PrettyTable.add_row') as \
m_add_row:
nodepoolcmd.main()
rows_with_val = 0
# Find add_rows with the status were looking for
for args, kwargs in m_add_row.call_args_list:
row = args[0]
if col_count:
self.assertEquals(len(row), col_count)
log.debug(row)
if row[col] == val:
rows_with_val += 1
self.assertEquals(rows_with_val, count)
break
except AssertionError:
# retry
pass
def assert_alien_images_listed(self, configfile, image_cnt, image_id):
self.assert_listed(configfile, ['alien-image-list'], 2, image_id,

View File

@ -22,7 +22,7 @@ from kazoo.client import KazooClient, KazooState
from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache
from kazoo.recipe.cache import TreeCache, TreeEvent
from nodepool import exceptions as npe
@ -702,6 +702,7 @@ class ZooKeeper(object):
self._last_retry_log = 0
self._node_cache = None
self._request_cache = None
self._cached_nodes = {}
# =======================================================================
# Private Methods
@ -893,6 +894,8 @@ class ZooKeeper(object):
self.logConnectionRetryEvent()
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
self._node_cache.listen_fault(self.cacheFaultListener)
self._node_cache.listen(self.nodeCacheListener)
self._node_cache.start()
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
@ -1780,25 +1783,21 @@ class ZooKeeper(object):
:returns: The node data, or None if the node was not found.
'''
path = self._nodePath(node)
data = None
stat = None
if cached:
cached_data = self._node_cache.get_data(path)
if cached_data:
data = cached_data.data
stat = cached_data.stat
d = self._cached_nodes.get(node)
if d:
return d
# If data is empty we either didn't use the cache or the cache didn't
# We got here we either didn't use the cache or the cache didn't
# have the node (yet). Note that even if we use caching we need to
# do a real query if the cached data is empty because the node data
# might not be in the cache yet when it's listed by the get_children
# call.
if not data:
try:
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
try:
path = self._nodePath(node)
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
if not data:
return None
@ -2060,3 +2059,51 @@ class ZooKeeper(object):
'''
for node in provider_nodes:
self.deleteNode(node)
def cacheFaultListener(self, e):
self.log.exception(e)
def nodeCacheListener(self, event):
if hasattr(event.event_data, 'path'):
# Ignore root node
path = event.event_data.path
if path == self.NODE_ROOT:
return
# Ignore lock nodes
if '/lock' in path:
return
# Ignore any non-node related events such as connection events here
if event.event_type not in (TreeEvent.NODE_ADDED,
TreeEvent.NODE_UPDATED,
TreeEvent.NODE_REMOVED):
return
path = event.event_data.path
node_id = path.rsplit('/', 1)[1]
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
# Perform an in-place update of the already cached node if possible
d = self._bytesToDict(event.event_data.data)
old_node = self._cached_nodes.get(node_id)
if old_node:
if event.event_data.stat.version <= old_node.stat.version:
# Don't update to older data
return
if old_node.lock:
# Don't update a locked node
return
old_node.updateFromDict(d)
old_node.stat = event.event_data.stat
else:
node = Node.fromDict(d, node_id)
node.stat = event.event_data.stat
self._cached_nodes[node_id] = node
elif event.event_type == TreeEvent.NODE_REMOVED:
try:
del self._cached_nodes[node_id]
except KeyError:
# If it's already gone, don't care
pass