Merge "Asynchronously update node statistics"

This commit is contained in:
Zuul 2018-11-29 21:00:14 +00:00 committed by Gerrit Code Review
commit 81936cd818
5 changed files with 133 additions and 22 deletions

View File

@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
try:
dt = int((time.monotonic() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt)
self.updateNodeStats(self.zk, self.provider_config)
except Exception:
self.log.exception("Exception while reporting stats:")

View File

@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
SUSPEND_WAIT_TIME = 30
class NodeDeleter(threading.Thread, stats.StatsReporter):
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
def __init__(self, zk, provider_manager, node):
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
(node.provider, node.external_id))
stats.StatsReporter.__init__(self)
self._zk = zk
self._provider_manager = provider_manager
self._node = node
@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
self.delete(self._zk, self._provider_manager, self._node, node_exists)
try:
self.updateNodeStats(self._zk, self._provider_manager.provider)
except Exception:
self.log.exception("Exception while reporting stats:")
class PoolWorker(threading.Thread):
class PoolWorker(threading.Thread, stats.StatsReporter):
'''
Class that manages node requests for a single provider pool.
@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
os.getpid(),
self.name)
stats.StatsReporter.__init__(self)
# ---------------------------------------------------------------
# Private methods
@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
launcher.id = self.launcher_id
for prov_cfg in self.nodepool.config.providers.values():
launcher.supported_labels.update(prov_cfg.getSupportedLabels())
launcher.provider_name = self.provider_name
self.zk.registerLauncher(launcher)
self.updateProviderLimits(
self.nodepool.config.providers.get(self.provider_name))
try:
if not self.paused_handler:
self._assignHandlers()
@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
self.log.exception("Exception in DeletedNodeWorker:")
class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
def __init__(self, nodepool, interval):
super().__init__(nodepool, interval, name='StatsWorker')
self.log = logging.getLogger('nodepool.StatsWorker')
self.stats_event = threading.Event()
self.election = None
def stop(self):
self._running = False
if self.election is not None:
self.log.debug('Cancel leader election')
self.election.cancel()
self.stats_event.set()
super().stop()
def _run(self):
try:
stats.StatsReporter.__init__(self)
if not self._statsd:
return
if self.election is None:
zk = self._nodepool.getZK()
identifier = "%s-%s" % (socket.gethostname(), os.getpid())
self.election = zk.getStatsElection(identifier)
if not self._running:
return
self.election.run(self._run_stats)
except Exception:
self.log.exception('Exception in StatsWorker:')
def _run_stats(self):
self.log.info('Won stats reporter election')
# enable us getting events
zk = self._nodepool.getZK()
zk.setNodeStatsEvent(self.stats_event)
while self._running:
signaled = self.stats_event.wait()
if not self._running:
break
if not signaled:
continue
self.log.debug('Updating stats')
self.stats_event.clear()
try:
self.updateNodeStats(zk)
except Exception:
self.log.exception("Exception while reporting stats:")
time.sleep(1)
# Unregister from node stats events
zk.setNodeStatsEvent(None)
class NodePool(threading.Thread):
log = logging.getLogger("nodepool.NodePool")
@ -710,6 +773,7 @@ class NodePool(threading.Thread):
self.watermark_sleep = watermark_sleep
self.cleanup_interval = 60
self.delete_interval = 5
self.stats_interval = 5
self._stopped = False
self._stop_event = threading.Event()
self.config = None
@ -718,6 +782,7 @@ class NodePool(threading.Thread):
self._pool_threads = {}
self._cleanup_thread = None
self._delete_thread = None
self._stats_thread = None
self._submittedRequests = {}
def stop(self):
@ -738,6 +803,10 @@ class NodePool(threading.Thread):
self._delete_thread.stop()
self._delete_thread.join()
if self._stats_thread:
self._stats_thread.stop()
self._stats_thread.join()
# Don't let stop() return until all pool threads have been
# terminated.
self.log.debug("Stopping pool threads")
@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
self, self.delete_interval)
self._delete_thread.start()
if not self._stats_thread:
self._stats_thread = StatsWorker(self, self.stats_interval)
self._stats_thread.start()
# Stop any PoolWorker threads if the pool was removed
# from the config.
pool_keys = set()

View File

@ -85,39 +85,40 @@ class StatsReporter(object):
pipeline.incr(key)
pipeline.send()
def updateNodeStats(self, zk_conn, provider):
def updateNodeStats(self, zk_conn):
'''
Refresh statistics for all known nodes.
:param ZooKeeper zk_conn: A ZooKeeper connection object.
:param Provider provider: A config Provider object.
'''
if not self._statsd:
return
states = {}
launchers = zk_conn.getRegisteredLaunchers()
labels = set()
for launcher in launchers:
labels.update(launcher.supported_labels)
providers = set()
for launcher in launchers:
providers.add(launcher.provider_name)
# Initialize things we know about to zero
for state in zk.Node.VALID_STATES:
key = 'nodepool.nodes.%s' % state
states[key] = 0
key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state)
states[key] = 0
for provider in providers:
key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
states[key] = 0
# Initialize label stats to 0
for label in provider.getSupportedLabels():
for label in labels:
for state in zk.Node.VALID_STATES:
key = 'nodepool.label.%s.nodes.%s' % (label, state)
states[key] = 0
# Note that we intentionally don't use caching here because we don't
# know when the next update will happen and thus need to report the
# correct most recent state. Otherwise we can end up in reporting
# a gauge with a node in state deleting = 1 and never update this for
# a long time.
# TODO(tobiash): Changing updateNodeStats to just run periodically will
# resolve this and we can operate on cached data.
for node in zk_conn.nodeIterator(cached=False):
for node in zk_conn.nodeIterator():
# nodepool.nodes.STATE
key = 'nodepool.nodes.%s' % node.state
states[key] += 1
@ -145,9 +146,18 @@ class StatsReporter(object):
for key, count in states.items():
pipeline.gauge(key, count)
pipeline.send()
def updateProviderLimits(self, provider):
if not self._statsd:
return
pipeline = self._statsd.pipeline()
# nodepool.provider.PROVIDER.max_servers
key = 'nodepool.provider.%s.max_servers' % provider.name
max_servers = sum([p.max_servers for p in provider.pools.values()
if p.max_servers])
pipeline.gauge(key, max_servers)
pipeline.send()

View File

@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
'fake-provider3',
'CleanupWorker',
'DeletedNodeWorker',
'StatsWorker',
'pydevd.CommandThread',
'pydevd.Reader',
'pydevd.Writer',

View File

@ -23,6 +23,7 @@ from kazoo import exceptions as kze
from kazoo.handlers.threading import KazooTimeoutError
from kazoo.recipe.lock import Lock
from kazoo.recipe.cache import TreeCache, TreeEvent
from kazoo.recipe.election import Election
from nodepool import exceptions as npe
@ -164,6 +165,7 @@ class Launcher(Serializable):
def __init__(self):
self.id = None
self.provider_name = None
self._supported_labels = set()
def __eq__(self, other):
@ -186,6 +188,7 @@ class Launcher(Serializable):
def toDict(self):
d = {}
d['id'] = self.id
d['provider_name'] = self.provider_name
# sets are not JSON serializable, so use a sorted list
d['supported_labels'] = sorted(self.supported_labels)
return d
@ -194,6 +197,10 @@ class Launcher(Serializable):
def fromDict(d):
obj = Launcher()
obj.id = d.get('id')
# TODO(tobiash): The fallback to 'unknown' is only needed to avoid
# having a full nodepool shutdown on upgrade. It can be
# removed later.
obj.provider_name = d.get('provider_name', 'unknown')
obj.supported_labels = set(d.get('supported_labels', []))
return obj
@ -693,6 +700,7 @@ class ZooKeeper(object):
NODE_ROOT = "/nodepool/nodes"
REQUEST_ROOT = "/nodepool/requests"
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
ELECTION_ROOT = "/nodepool/elections"
# Log zookeeper retry every 10 seconds
retry_log_rate = 10
@ -710,10 +718,15 @@ class ZooKeeper(object):
self._cached_node_requests = {}
self.enable_cache = enable_cache
self.node_stats_event = None
# =======================================================================
# Private Methods
# =======================================================================
def _electionPath(self, election):
return "%s/%s" % (self.ELECTION_ROOT, election)
def _imagePath(self, image):
return "%s/%s" % (self.IMAGE_ROOT, image)
@ -2106,6 +2119,10 @@ class ZooKeeper(object):
node = Node.fromDict(d, node_id)
node.stat = event.event_data.stat
self._cached_nodes[node_id] = node
# set the stats event so the stats reporting thread can act upon it
if self.node_stats_event is not None:
self.node_stats_event.set()
elif event.event_type == TreeEvent.NODE_REMOVED:
try:
del self._cached_nodes[node_id]
@ -2113,6 +2130,13 @@ class ZooKeeper(object):
# If it's already gone, don't care
pass
# set the stats event so the stats reporting thread can act upon it
if self.node_stats_event is not None:
self.node_stats_event.set()
def setNodeStatsEvent(self, event):
self.node_stats_event = event
def requestCacheListener(self, event):
if hasattr(event.event_data, 'path'):
@ -2158,3 +2182,7 @@ class ZooKeeper(object):
except KeyError:
# If it's already gone, don't care
pass
def getStatsElection(self, identifier):
path = self._electionPath('stats')
return Election(self.client, path, identifier)