Merge "Add second level cache of nodes"
This commit is contained in:
commit
1aac1cc8d2
|
@ -39,18 +39,25 @@ class TestNodepoolCMD(tests.DBTestCase):
|
|||
def assert_listed(self, configfile, cmd, col, val, count, col_count=0):
|
||||
log = logging.getLogger("tests.PrettyTableMock")
|
||||
self.patch_argv("-c", configfile, *cmd)
|
||||
with mock.patch('prettytable.PrettyTable.add_row') as m_add_row:
|
||||
nodepoolcmd.main()
|
||||
rows_with_val = 0
|
||||
# Find add_rows with the status were looking for
|
||||
for args, kwargs in m_add_row.call_args_list:
|
||||
row = args[0]
|
||||
if col_count:
|
||||
self.assertEquals(len(row), col_count)
|
||||
log.debug(row)
|
||||
if row[col] == val:
|
||||
rows_with_val += 1
|
||||
self.assertEquals(rows_with_val, count)
|
||||
for _ in iterate_timeout(10, AssertionError, 'assert listed'):
|
||||
try:
|
||||
with mock.patch('prettytable.PrettyTable.add_row') as \
|
||||
m_add_row:
|
||||
nodepoolcmd.main()
|
||||
rows_with_val = 0
|
||||
# Find add_rows with the status were looking for
|
||||
for args, kwargs in m_add_row.call_args_list:
|
||||
row = args[0]
|
||||
if col_count:
|
||||
self.assertEquals(len(row), col_count)
|
||||
log.debug(row)
|
||||
if row[col] == val:
|
||||
rows_with_val += 1
|
||||
self.assertEquals(rows_with_val, count)
|
||||
break
|
||||
except AssertionError:
|
||||
# retry
|
||||
pass
|
||||
|
||||
def assert_alien_images_listed(self, configfile, image_cnt, image_id):
|
||||
self.assert_listed(configfile, ['alien-image-list'], 2, image_id,
|
||||
|
|
|
@ -22,7 +22,7 @@ from kazoo.client import KazooClient, KazooState
|
|||
from kazoo import exceptions as kze
|
||||
from kazoo.handlers.threading import KazooTimeoutError
|
||||
from kazoo.recipe.lock import Lock
|
||||
from kazoo.recipe.cache import TreeCache
|
||||
from kazoo.recipe.cache import TreeCache, TreeEvent
|
||||
|
||||
from nodepool import exceptions as npe
|
||||
|
||||
|
@ -702,6 +702,7 @@ class ZooKeeper(object):
|
|||
self._last_retry_log = 0
|
||||
self._node_cache = None
|
||||
self._request_cache = None
|
||||
self._cached_nodes = {}
|
||||
|
||||
# =======================================================================
|
||||
# Private Methods
|
||||
|
@ -893,6 +894,8 @@ class ZooKeeper(object):
|
|||
self.logConnectionRetryEvent()
|
||||
|
||||
self._node_cache = TreeCache(self.client, self.NODE_ROOT)
|
||||
self._node_cache.listen_fault(self.cacheFaultListener)
|
||||
self._node_cache.listen(self.nodeCacheListener)
|
||||
self._node_cache.start()
|
||||
|
||||
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
|
||||
|
@ -1780,25 +1783,21 @@ class ZooKeeper(object):
|
|||
|
||||
:returns: The node data, or None if the node was not found.
|
||||
'''
|
||||
path = self._nodePath(node)
|
||||
data = None
|
||||
stat = None
|
||||
if cached:
|
||||
cached_data = self._node_cache.get_data(path)
|
||||
if cached_data:
|
||||
data = cached_data.data
|
||||
stat = cached_data.stat
|
||||
d = self._cached_nodes.get(node)
|
||||
if d:
|
||||
return d
|
||||
|
||||
# If data is empty we either didn't use the cache or the cache didn't
|
||||
# We got here we either didn't use the cache or the cache didn't
|
||||
# have the node (yet). Note that even if we use caching we need to
|
||||
# do a real query if the cached data is empty because the node data
|
||||
# might not be in the cache yet when it's listed by the get_children
|
||||
# call.
|
||||
if not data:
|
||||
try:
|
||||
data, stat = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
return None
|
||||
try:
|
||||
path = self._nodePath(node)
|
||||
data, stat = self.client.get(path)
|
||||
except kze.NoNodeError:
|
||||
return None
|
||||
|
||||
if not data:
|
||||
return None
|
||||
|
@ -2060,3 +2059,51 @@ class ZooKeeper(object):
|
|||
'''
|
||||
for node in provider_nodes:
|
||||
self.deleteNode(node)
|
||||
|
||||
def cacheFaultListener(self, e):
|
||||
self.log.exception(e)
|
||||
|
||||
def nodeCacheListener(self, event):
|
||||
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.NODE_ROOT:
|
||||
return
|
||||
|
||||
# Ignore lock nodes
|
||||
if '/lock' in path:
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
node_id = path.rsplit('/', 1)[1]
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Perform an in-place update of the already cached node if possible
|
||||
d = self._bytesToDict(event.event_data.data)
|
||||
old_node = self._cached_nodes.get(node_id)
|
||||
if old_node:
|
||||
if event.event_data.stat.version <= old_node.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if old_node.lock:
|
||||
# Don't update a locked node
|
||||
return
|
||||
old_node.updateFromDict(d)
|
||||
old_node.stat = event.event_data.stat
|
||||
else:
|
||||
node = Node.fromDict(d, node_id)
|
||||
node.stat = event.event_data.stat
|
||||
self._cached_nodes[node_id] = node
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_nodes[node_id]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
|
Loading…
Reference in New Issue