Merge "Add second level cache to node requests"

This commit is contained in:
Zuul 2018-11-29 09:59:40 +00:00 committed by Gerrit Code Review
commit 103e64ce24
1 changed files with 58 additions and 13 deletions

View File

@ -703,6 +703,7 @@ class ZooKeeper(object):
self._node_cache = None
self._request_cache = None
self._cached_nodes = {}
self._cached_node_requests = {}
# =======================================================================
# Private Methods
@ -899,6 +900,8 @@ class ZooKeeper(object):
self._node_cache.start()
self._request_cache = TreeCache(self.client, self.REQUEST_ROOT)
self._request_cache.listen_fault(self.cacheFaultListener)
self._request_cache.listen(self.requestCacheListener)
self._request_cache.start()
def disconnect(self):
@ -1569,25 +1572,21 @@ class ZooKeeper(object):
:returns: The request data, or None if the request was not found.
'''
path = self._requestPath(request)
data = None
stat = None
if cached:
cached_data = self._request_cache.get_data(path)
if cached_data:
data = cached_data.data
stat = cached_data.stat
d = self._cached_node_requests.get(request)
if d:
return d
# If data is empty we either didn't use the cache or the cache didn't
# If we got here we either didn't use the cache or the cache didn't
# have the request (yet). Note that even if we use caching we need to
# do a real query if the cached data is empty because the request data
# might not be in the cache yet when it's listed by the get_children
# call.
if not data:
try:
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
try:
path = self._requestPath(request)
data, stat = self.client.get(path)
except kze.NoNodeError:
return None
d = NodeRequest.fromDict(self._bytesToDict(data), request)
d.stat = stat
@ -2107,3 +2106,49 @@ class ZooKeeper(object):
except KeyError:
# If it's already gone, don't care
pass
def requestCacheListener(self, event):
if hasattr(event.event_data, 'path'):
# Ignore root node
path = event.event_data.path
if path == self.REQUEST_ROOT:
return
# Ignore lock nodes
if '/lock' in path:
return
# Ignore any non-node related events such as connection events here
if event.event_type not in (TreeEvent.NODE_ADDED,
TreeEvent.NODE_UPDATED,
TreeEvent.NODE_REMOVED):
return
path = event.event_data.path
request_id = path.rsplit('/', 1)[1]
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
# Perform an in-place update of the cached request if possible
d = self._bytesToDict(event.event_data.data)
old_request = self._cached_node_requests.get(request_id)
if old_request:
if event.event_data.stat.version <= old_request.stat.version:
# Don't update to older data
return
if old_request.lock:
# Don't update a locked node request
return
old_request.updateFromDict(d)
old_request.stat = event.event_data.stat
else:
request = NodeRequest.fromDict(d, request_id)
request.stat = event.event_data.stat
self._cached_node_requests[request_id] = request
elif event.event_type == TreeEvent.NODE_REMOVED:
try:
del self._cached_node_requests[request_id]
except KeyError:
# If it's already gone, don't care
pass