Asynchronously update node statistics
We currently updarte the node statistics on every node launch or delete. This cannot use caching at the moment because when the statistics are updated we might end up pushing slightly outdated data. If then there is no further update for a longer time we end up with broken gauges. We already get update events from the node cache so we can use that to centrally trigger node statistics updates. This is combined with leader election so there is only a single launcher that keeps the statistics up to date. This will ensure that the statistics are not cluttered because of several launchers pushing their own slightly different view into the stats. As a side effect this reduces the runtime of a test that creates 200 nodes from 100s to 70s on my local machine. Change-Id: I77c6edc1db45b5b45be1812cf19eea66fdfab014
This commit is contained in:
parent
9d77f05d8e
commit
64487baef0
|
@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
|
|||
try:
|
||||
dt = int((time.monotonic() - start_time) * 1000)
|
||||
self.recordLaunchStats(statsd_key, dt)
|
||||
self.updateNodeStats(self.zk, self.provider_config)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
|
||||
|
|
|
@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
|
|||
SUSPEND_WAIT_TIME = 30
|
||||
|
||||
|
||||
class NodeDeleter(threading.Thread, stats.StatsReporter):
|
||||
class NodeDeleter(threading.Thread):
|
||||
log = logging.getLogger("nodepool.NodeDeleter")
|
||||
|
||||
def __init__(self, zk, provider_manager, node):
|
||||
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
|
||||
(node.provider, node.external_id))
|
||||
stats.StatsReporter.__init__(self)
|
||||
self._zk = zk
|
||||
self._provider_manager = provider_manager
|
||||
self._node = node
|
||||
|
@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
|
|||
|
||||
self.delete(self._zk, self._provider_manager, self._node, node_exists)
|
||||
|
||||
try:
|
||||
self.updateNodeStats(self._zk, self._provider_manager.provider)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
|
||||
|
||||
class PoolWorker(threading.Thread):
|
||||
class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
'''
|
||||
Class that manages node requests for a single provider pool.
|
||||
|
||||
|
@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
|
|||
self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
|
||||
os.getpid(),
|
||||
self.name)
|
||||
stats.StatsReporter.__init__(self)
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# Private methods
|
||||
|
@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
|
|||
launcher.id = self.launcher_id
|
||||
for prov_cfg in self.nodepool.config.providers.values():
|
||||
launcher.supported_labels.update(prov_cfg.getSupportedLabels())
|
||||
launcher.provider_name = self.provider_name
|
||||
self.zk.registerLauncher(launcher)
|
||||
|
||||
self.updateProviderLimits(
|
||||
self.nodepool.config.providers.get(self.provider_name))
|
||||
|
||||
try:
|
||||
if not self.paused_handler:
|
||||
self._assignHandlers()
|
||||
|
@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
|
|||
self.log.exception("Exception in DeletedNodeWorker:")
|
||||
|
||||
|
||||
class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
|
||||
|
||||
def __init__(self, nodepool, interval):
|
||||
super().__init__(nodepool, interval, name='StatsWorker')
|
||||
self.log = logging.getLogger('nodepool.StatsWorker')
|
||||
self.stats_event = threading.Event()
|
||||
self.election = None
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
if self.election is not None:
|
||||
self.log.debug('Cancel leader election')
|
||||
self.election.cancel()
|
||||
self.stats_event.set()
|
||||
super().stop()
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
stats.StatsReporter.__init__(self)
|
||||
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
if self.election is None:
|
||||
zk = self._nodepool.getZK()
|
||||
identifier = "%s-%s" % (socket.gethostname(), os.getpid())
|
||||
self.election = zk.getStatsElection(identifier)
|
||||
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
self.election.run(self._run_stats)
|
||||
|
||||
except Exception:
|
||||
self.log.exception('Exception in StatsWorker:')
|
||||
|
||||
def _run_stats(self):
|
||||
self.log.info('Won stats reporter election')
|
||||
|
||||
# enable us getting events
|
||||
zk = self._nodepool.getZK()
|
||||
zk.setNodeStatsEvent(self.stats_event)
|
||||
|
||||
while self._running:
|
||||
signaled = self.stats_event.wait()
|
||||
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
if not signaled:
|
||||
continue
|
||||
|
||||
self.log.debug('Updating stats')
|
||||
self.stats_event.clear()
|
||||
try:
|
||||
self.updateNodeStats(zk)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
time.sleep(1)
|
||||
|
||||
# Unregister from node stats events
|
||||
zk.setNodeStatsEvent(None)
|
||||
|
||||
|
||||
class NodePool(threading.Thread):
|
||||
log = logging.getLogger("nodepool.NodePool")
|
||||
|
||||
|
@ -710,6 +773,7 @@ class NodePool(threading.Thread):
|
|||
self.watermark_sleep = watermark_sleep
|
||||
self.cleanup_interval = 60
|
||||
self.delete_interval = 5
|
||||
self.stats_interval = 5
|
||||
self._stopped = False
|
||||
self._stop_event = threading.Event()
|
||||
self.config = None
|
||||
|
@ -718,6 +782,7 @@ class NodePool(threading.Thread):
|
|||
self._pool_threads = {}
|
||||
self._cleanup_thread = None
|
||||
self._delete_thread = None
|
||||
self._stats_thread = None
|
||||
self._submittedRequests = {}
|
||||
|
||||
def stop(self):
|
||||
|
@ -738,6 +803,10 @@ class NodePool(threading.Thread):
|
|||
self._delete_thread.stop()
|
||||
self._delete_thread.join()
|
||||
|
||||
if self._stats_thread:
|
||||
self._stats_thread.stop()
|
||||
self._stats_thread.join()
|
||||
|
||||
# Don't let stop() return until all pool threads have been
|
||||
# terminated.
|
||||
self.log.debug("Stopping pool threads")
|
||||
|
@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
|
|||
self, self.delete_interval)
|
||||
self._delete_thread.start()
|
||||
|
||||
if not self._stats_thread:
|
||||
self._stats_thread = StatsWorker(self, self.stats_interval)
|
||||
self._stats_thread.start()
|
||||
|
||||
# Stop any PoolWorker threads if the pool was removed
|
||||
# from the config.
|
||||
pool_keys = set()
|
||||
|
|
|
@ -85,39 +85,40 @@ class StatsReporter(object):
|
|||
pipeline.incr(key)
|
||||
pipeline.send()
|
||||
|
||||
def updateNodeStats(self, zk_conn, provider):
|
||||
def updateNodeStats(self, zk_conn):
|
||||
'''
|
||||
Refresh statistics for all known nodes.
|
||||
|
||||
:param ZooKeeper zk_conn: A ZooKeeper connection object.
|
||||
:param Provider provider: A config Provider object.
|
||||
'''
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
states = {}
|
||||
|
||||
launchers = zk_conn.getRegisteredLaunchers()
|
||||
labels = set()
|
||||
for launcher in launchers:
|
||||
labels.update(launcher.supported_labels)
|
||||
providers = set()
|
||||
for launcher in launchers:
|
||||
providers.add(launcher.provider_name)
|
||||
|
||||
# Initialize things we know about to zero
|
||||
for state in zk.Node.VALID_STATES:
|
||||
key = 'nodepool.nodes.%s' % state
|
||||
states[key] = 0
|
||||
key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state)
|
||||
states[key] = 0
|
||||
for provider in providers:
|
||||
key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
|
||||
states[key] = 0
|
||||
|
||||
# Initialize label stats to 0
|
||||
for label in provider.getSupportedLabels():
|
||||
for label in labels:
|
||||
for state in zk.Node.VALID_STATES:
|
||||
key = 'nodepool.label.%s.nodes.%s' % (label, state)
|
||||
states[key] = 0
|
||||
|
||||
# Note that we intentionally don't use caching here because we don't
|
||||
# know when the next update will happen and thus need to report the
|
||||
# correct most recent state. Otherwise we can end up in reporting
|
||||
# a gauge with a node in state deleting = 1 and never update this for
|
||||
# a long time.
|
||||
# TODO(tobiash): Changing updateNodeStats to just run periodically will
|
||||
# resolve this and we can operate on cached data.
|
||||
for node in zk_conn.nodeIterator(cached=False):
|
||||
for node in zk_conn.nodeIterator():
|
||||
# nodepool.nodes.STATE
|
||||
key = 'nodepool.nodes.%s' % node.state
|
||||
states[key] += 1
|
||||
|
@ -145,9 +146,18 @@ class StatsReporter(object):
|
|||
for key, count in states.items():
|
||||
pipeline.gauge(key, count)
|
||||
|
||||
pipeline.send()
|
||||
|
||||
def updateProviderLimits(self, provider):
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
pipeline = self._statsd.pipeline()
|
||||
|
||||
# nodepool.provider.PROVIDER.max_servers
|
||||
key = 'nodepool.provider.%s.max_servers' % provider.name
|
||||
max_servers = sum([p.max_servers for p in provider.pools.values()
|
||||
if p.max_servers])
|
||||
pipeline.gauge(key, max_servers)
|
||||
|
||||
pipeline.send()
|
||||
|
|
|
@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
|
|||
'fake-provider3',
|
||||
'CleanupWorker',
|
||||
'DeletedNodeWorker',
|
||||
'StatsWorker',
|
||||
'pydevd.CommandThread',
|
||||
'pydevd.Reader',
|
||||
'pydevd.Writer',
|
||||
|
|
|
@ -23,6 +23,7 @@ from kazoo import exceptions as kze
|
|||
from kazoo.handlers.threading import KazooTimeoutError
|
||||
from kazoo.recipe.lock import Lock
|
||||
from kazoo.recipe.cache import TreeCache, TreeEvent
|
||||
from kazoo.recipe.election import Election
|
||||
|
||||
from nodepool import exceptions as npe
|
||||
|
||||
|
@ -164,6 +165,7 @@ class Launcher(Serializable):
|
|||
|
||||
def __init__(self):
|
||||
self.id = None
|
||||
self.provider_name = None
|
||||
self._supported_labels = set()
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -186,6 +188,7 @@ class Launcher(Serializable):
|
|||
def toDict(self):
|
||||
d = {}
|
||||
d['id'] = self.id
|
||||
d['provider_name'] = self.provider_name
|
||||
# sets are not JSON serializable, so use a sorted list
|
||||
d['supported_labels'] = sorted(self.supported_labels)
|
||||
return d
|
||||
|
@ -194,6 +197,10 @@ class Launcher(Serializable):
|
|||
def fromDict(d):
|
||||
obj = Launcher()
|
||||
obj.id = d.get('id')
|
||||
# TODO(tobiash): The fallback to 'unknown' is only needed to avoid
|
||||
# having a full nodepool shutdown on upgrade. It can be
|
||||
# removed later.
|
||||
obj.provider_name = d.get('provider_name', 'unknown')
|
||||
obj.supported_labels = set(d.get('supported_labels', []))
|
||||
return obj
|
||||
|
||||
|
@ -689,6 +696,7 @@ class ZooKeeper(object):
|
|||
NODE_ROOT = "/nodepool/nodes"
|
||||
REQUEST_ROOT = "/nodepool/requests"
|
||||
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
|
||||
ELECTION_ROOT = "/nodepool/elections"
|
||||
|
||||
# Log zookeeper retry every 10 seconds
|
||||
retry_log_rate = 10
|
||||
|
@ -706,10 +714,15 @@ class ZooKeeper(object):
|
|||
self._cached_node_requests = {}
|
||||
self.enable_cache = enable_cache
|
||||
|
||||
self.node_stats_event = None
|
||||
|
||||
# =======================================================================
|
||||
# Private Methods
|
||||
# =======================================================================
|
||||
|
||||
def _electionPath(self, election):
|
||||
return "%s/%s" % (self.ELECTION_ROOT, election)
|
||||
|
||||
def _imagePath(self, image):
|
||||
return "%s/%s" % (self.IMAGE_ROOT, image)
|
||||
|
||||
|
@ -2102,6 +2115,10 @@ class ZooKeeper(object):
|
|||
node = Node.fromDict(d, node_id)
|
||||
node.stat = event.event_data.stat
|
||||
self._cached_nodes[node_id] = node
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_nodes[node_id]
|
||||
|
@ -2109,6 +2126,13 @@ class ZooKeeper(object):
|
|||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
|
||||
def setNodeStatsEvent(self, event):
|
||||
self.node_stats_event = event
|
||||
|
||||
def requestCacheListener(self, event):
|
||||
|
||||
if hasattr(event.event_data, 'path'):
|
||||
|
@ -2154,3 +2178,7 @@ class ZooKeeper(object):
|
|||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
def getStatsElection(self, identifier):
|
||||
path = self._electionPath('stats')
|
||||
return Election(self.client, path, identifier)
|
||||
|
|
Loading…
Reference in New Issue