Asynchronously update node statistics

We currently updarte the node statistics on every node launch or
delete. This cannot use caching at the moment because when the
statistics are updated we might end up pushing slightly outdated
data. If then there is no further update for a longer time we end up
with broken gauges. We already get update events from the node cache
so we can use that to centrally trigger node statistics updates.

This is combined with leader election so there is only a single
launcher that keeps the statistics up to date. This will ensure that
the statistics are not cluttered because of several launchers pushing
their own slightly different view into the stats.

As a side effect this reduces the runtime of a test that creates 200
nodes from 100s to 70s on my local machine.

Change-Id: I77c6edc1db45b5b45be1812cf19eea66fdfab014
This commit is contained in:
Tobias Henkel 2018-11-22 14:37:58 +01:00
parent 9d77f05d8e
commit 64487baef0
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
5 changed files with 133 additions and 22 deletions

View File

@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
try:
dt = int((time.monotonic() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt)
self.updateNodeStats(self.zk, self.provider_config)
except Exception:
self.log.exception("Exception while reporting stats:")

View File

@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
SUSPEND_WAIT_TIME = 30
class NodeDeleter(threading.Thread, stats.StatsReporter):
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
stats.StatsReporter.__init__(self)
self._zk = zk
self._provider_manager = provider_manager
self._node = node
@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
self.delete(self._zk, self._provider_manager, self._node, node_exists)
try:
self.updateNodeStats(self._zk, self._provider_manager.provider)
except Exception:
self.log.exception("Exception while reporting stats:")
class PoolWorker(threading.Thread):
class PoolWorker(threading.Thread, stats.StatsReporter):
'''
Class that manages node requests for a single provider pool.
@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
os.getpid(),
self.name)
stats.StatsReporter.__init__(self)
# ---------------------------------------------------------------
# Private methods
@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
launcher.id = self.launcher_id
for prov_cfg in self.nodepool.config.providers.values():
launcher.supported_labels.update(prov_cfg.getSupportedLabels())
launcher.provider_name = self.provider_name
self.zk.registerLauncher(launcher)
self.updateProviderLimits(
self.nodepool.config.providers.get(self.provider_name))
try:
if not self.paused_handler:
self._assignHandlers()
@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
self.log.exception("Exception in DeletedNodeWorker:")
class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
def __init__(self, nodepool, interval):
super().__init__(nodepool, interval, name='StatsWorker')
self.log = logging.getLogger('nodepool.StatsWorker')
self.stats_event = threading.Event()
self.election = None
def stop(self):
self._running = False
if self.election is not None:
self.log.debug('Cancel leader election')
self.election.cancel()
self.stats_event.set()
super().stop()
def _run(self):
try:
stats.StatsReporter.__init__(self)
if not self._statsd:
return
if self.election is None:
zk = self._nodepool.getZK()
identifier = "%s-%s" % (socket.gethostname(), os.getpid())
self.election = zk.getStatsElection(identifier)
if not self._running:
return
self.election.run(self._run_stats)
except Exception:
self.log.exception('Exception in StatsWorker:')
def _run_stats(self):
self.log.info('Won stats reporter election')
# enable us getting events
zk = self._nodepool.getZK()
zk.setNodeStatsEvent(self.stats_event)
while self._running:
signaled = self.stats_event.wait()
if not self._running:
break
if not signaled:
continue
self.log.debug('Updating stats')
self.stats_event.clear()
try:
self.updateNodeStats(zk)
except Exception:
self.log.exception("Exception while reporting stats:")
time.sleep(1)
# Unregister from node stats events
zk.setNodeStatsEvent(None)
class NodePool(threading.Thread):
log = logging.getLogger("nodepool.NodePool")
@ -710,6 +773,7 @@ class NodePool(threading.Thread):
self.watermark_sleep = watermark_sleep
self.cleanup_interval = 60
self.delete_interval = 5
self.stats_interval = 5
self._stopped = False
self._stop_event = threading.Event()
self.config = None
@ -718,6 +782,7 @@ class NodePool(threading.Thread):
self._pool_threads = {}
self._cleanup_thread = None
self._delete_thread = None
self._stats_thread = None
self._submittedRequests = {}
def stop(self):
@ -738,6 +803,10 @@ class NodePool(threading.Thread):
self._delete_thread.stop()
self._delete_thread.join()
if self._stats_thread:
self._stats_thread.stop()
self._stats_thread.join()
# Don't let stop() return until all pool threads have been
# terminated.
self.log.debug("Stopping pool threads")
@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
self, self.delete_interval)
self._delete_thread.start()
if not self._stats_thread:
self._stats_thread = StatsWorker(self, self.stats_interval)
self._stats_thread.start()
# Stop any PoolWorker threads if the pool was removed
# from the config.
pool_keys = set()

View File

@ -85,39 +85,40 @@ class StatsReporter(object):
pipeline.incr(key)
pipeline.send()
def updateNodeStats(self, zk_conn, provider):
def updateNodeStats(self, zk_conn):
'''
Refresh statistics for all known nodes.
:param ZooKeeper zk_conn: A ZooKeeper connection object.
:param Provider provider: A config Provider object.
'''
if not self._statsd:
return
states = {}
launchers = zk_conn.getRegisteredLaunchers()
labels = set()
for launcher in launchers:
labels.update(launcher.supported_labels)
providers = set()
for launcher in launchers:
providers.add(launcher.provider_name)
# Initialize things we know about to zero
for state in zk.Node.VALID_STATES:
key = 'nodepool.nodes.%s' % state
states[key] = 0
key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state)
states[key] = 0
for provider in providers:
key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
states[key] = 0
# Initialize label stats to 0
for label in provider.getSupportedLabels():
for label in labels:
for state in zk.Node.VALID_STATES:
key = 'nodepool.label.%s.nodes.%s' % (label, state)
states[key] = 0
# Note that we intentionally don't use caching here because we don't
# know when the next update will happen and thus need to report the
# correct most recent state. Otherwise we can end up in reporting
# a gauge with a node in state deleting = 1 and never update this for
# a long time.
# TODO(tobiash): Changing updateNodeStats to just run periodically will
# resolve this and we can operate on cached data.
for node in zk_conn.nodeIterator(cached=False):
for node in zk_conn.nodeIterator():
# nodepool.nodes.STATE
key = 'nodepool.nodes.%s' % node.state
states[key] += 1
@ -145,9 +146,18 @@ class StatsReporter(object):
for key, count in states.items():
pipeline.gauge(key, count)
pipeline.send()
def updateProviderLimits(self, provider):
if not self._statsd:
return
pipeline = self._statsd.pipeline()
# nodepool.provider.PROVIDER.max_servers
key = 'nodepool.provider.%s.max_servers' % provider.name
max_servers = sum([p.max_servers for p in provider.pools.values()
if p.max_servers])
pipeline.gauge(key, max_servers)
pipeline.send()

View File

@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
'fake-provider3',
'CleanupWorker',
'DeletedNodeWorker',
'StatsWorker',
'pydevd.CommandThread',
'pydevd.Reader',
'pydevd.Writer',

View File

@ -23,6 +23,7 @@ from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache, TreeEvent
from kazoo.recipe.election import Election
from nodepool import exceptions as npe
@ -164,6 +165,7 @@ class Launcher(Serializable):
def __init__(self):
self.id = None
self.provider_name = None
self._supported_labels = set()
def __eq__(self, other):
@ -186,6 +188,7 @@ class Launcher(Serializable):
def toDict(self):
d = {}
d['id'] = self.id
d['provider_name'] = self.provider_name
# sets are not JSON serializable, so use a sorted list
d['supported_labels'] = sorted(self.supported_labels)
return d
@ -194,6 +197,10 @@ class Launcher(Serializable):
def fromDict(d):
obj = Launcher()
obj.id = d.get('id')
# TODO(tobiash): The fallback to 'unknown' is only needed to avoid
# having a full nodepool shutdown on upgrade. It can be
# removed later.
obj.provider_name = d.get('provider_name', 'unknown')
obj.supported_labels = set(d.get('supported_labels', []))
return obj
@ -689,6 +696,7 @@ class ZooKeeper(object):
NODE_ROOT = "/nodepool/nodes"
REQUEST_ROOT = "/nodepool/requests"
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
ELECTION_ROOT = "/nodepool/elections"
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
@ -706,10 +714,15 @@ class ZooKeeper(object):
self._cached_node_requests = {}
self.enable_cache = enable_cache
self.node_stats_event = None
# =======================================================================
# Private Methods
# =======================================================================
def _electionPath(self, election):
return "%s/%s" % (self.ELECTION_ROOT, election)
def _imagePath(self, image):
return "%s/%s" % (self.IMAGE_ROOT, image)
@ -2102,6 +2115,10 @@ class ZooKeeper(object):
node = Node.fromDict(d, node_id)
node.stat = event.event_data.stat
self._cached_nodes[node_id] = node
# set the stats event so the stats reporting thread can act upon it
if self.node_stats_event is not None:
self.node_stats_event.set()
elif event.event_type == TreeEvent.NODE_REMOVED:
try:
del self._cached_nodes[node_id]
@ -2109,6 +2126,13 @@ class ZooKeeper(object):
# If it's already gone, don't care
pass
# set the stats event so the stats reporting thread can act upon it
if self.node_stats_event is not None:
self.node_stats_event.set()
def setNodeStatsEvent(self, event):
self.node_stats_event = event
def requestCacheListener(self, event):
if hasattr(event.event_data, 'path'):
@ -2154,3 +2178,7 @@ class ZooKeeper(object):
except KeyError:
# If it's already gone, don't care
pass
def getStatsElection(self, identifier):
path = self._electionPath('stats')
return Election(self.client, path, identifier)