Merge "Asynchronously update node statistics"
This commit is contained in:
commit
81936cd818
|
@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
|
|||
try:
|
||||
dt = int((time.monotonic() - start_time) * 1000)
|
||||
self.recordLaunchStats(statsd_key, dt)
|
||||
self.updateNodeStats(self.zk, self.provider_config)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
|
||||
|
|
|
@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
|
|||
SUSPEND_WAIT_TIME = 30
|
||||
|
||||
|
||||
class NodeDeleter(threading.Thread, stats.StatsReporter):
|
||||
class NodeDeleter(threading.Thread):
|
||||
log = logging.getLogger("nodepool.NodeDeleter")
|
||||
|
||||
def __init__(self, zk, provider_manager, node):
|
||||
threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
|
||||
(node.provider, node.external_id))
|
||||
stats.StatsReporter.__init__(self)
|
||||
self._zk = zk
|
||||
self._provider_manager = provider_manager
|
||||
self._node = node
|
||||
|
@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
|
|||
|
||||
self.delete(self._zk, self._provider_manager, self._node, node_exists)
|
||||
|
||||
try:
|
||||
self.updateNodeStats(self._zk, self._provider_manager.provider)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
|
||||
|
||||
class PoolWorker(threading.Thread):
|
||||
class PoolWorker(threading.Thread, stats.StatsReporter):
|
||||
'''
|
||||
Class that manages node requests for a single provider pool.
|
||||
|
||||
|
@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
|
|||
self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
|
||||
os.getpid(),
|
||||
self.name)
|
||||
stats.StatsReporter.__init__(self)
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# Private methods
|
||||
|
@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
|
|||
launcher.id = self.launcher_id
|
||||
for prov_cfg in self.nodepool.config.providers.values():
|
||||
launcher.supported_labels.update(prov_cfg.getSupportedLabels())
|
||||
launcher.provider_name = self.provider_name
|
||||
self.zk.registerLauncher(launcher)
|
||||
|
||||
self.updateProviderLimits(
|
||||
self.nodepool.config.providers.get(self.provider_name))
|
||||
|
||||
try:
|
||||
if not self.paused_handler:
|
||||
self._assignHandlers()
|
||||
|
@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
|
|||
self.log.exception("Exception in DeletedNodeWorker:")
|
||||
|
||||
|
||||
class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
|
||||
|
||||
def __init__(self, nodepool, interval):
|
||||
super().__init__(nodepool, interval, name='StatsWorker')
|
||||
self.log = logging.getLogger('nodepool.StatsWorker')
|
||||
self.stats_event = threading.Event()
|
||||
self.election = None
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
if self.election is not None:
|
||||
self.log.debug('Cancel leader election')
|
||||
self.election.cancel()
|
||||
self.stats_event.set()
|
||||
super().stop()
|
||||
|
||||
def _run(self):
|
||||
try:
|
||||
stats.StatsReporter.__init__(self)
|
||||
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
if self.election is None:
|
||||
zk = self._nodepool.getZK()
|
||||
identifier = "%s-%s" % (socket.gethostname(), os.getpid())
|
||||
self.election = zk.getStatsElection(identifier)
|
||||
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
self.election.run(self._run_stats)
|
||||
|
||||
except Exception:
|
||||
self.log.exception('Exception in StatsWorker:')
|
||||
|
||||
def _run_stats(self):
|
||||
self.log.info('Won stats reporter election')
|
||||
|
||||
# enable us getting events
|
||||
zk = self._nodepool.getZK()
|
||||
zk.setNodeStatsEvent(self.stats_event)
|
||||
|
||||
while self._running:
|
||||
signaled = self.stats_event.wait()
|
||||
|
||||
if not self._running:
|
||||
break
|
||||
|
||||
if not signaled:
|
||||
continue
|
||||
|
||||
self.log.debug('Updating stats')
|
||||
self.stats_event.clear()
|
||||
try:
|
||||
self.updateNodeStats(zk)
|
||||
except Exception:
|
||||
self.log.exception("Exception while reporting stats:")
|
||||
time.sleep(1)
|
||||
|
||||
# Unregister from node stats events
|
||||
zk.setNodeStatsEvent(None)
|
||||
|
||||
|
||||
class NodePool(threading.Thread):
|
||||
log = logging.getLogger("nodepool.NodePool")
|
||||
|
||||
|
@ -710,6 +773,7 @@ class NodePool(threading.Thread):
|
|||
self.watermark_sleep = watermark_sleep
|
||||
self.cleanup_interval = 60
|
||||
self.delete_interval = 5
|
||||
self.stats_interval = 5
|
||||
self._stopped = False
|
||||
self._stop_event = threading.Event()
|
||||
self.config = None
|
||||
|
@ -718,6 +782,7 @@ class NodePool(threading.Thread):
|
|||
self._pool_threads = {}
|
||||
self._cleanup_thread = None
|
||||
self._delete_thread = None
|
||||
self._stats_thread = None
|
||||
self._submittedRequests = {}
|
||||
|
||||
def stop(self):
|
||||
|
@ -738,6 +803,10 @@ class NodePool(threading.Thread):
|
|||
self._delete_thread.stop()
|
||||
self._delete_thread.join()
|
||||
|
||||
if self._stats_thread:
|
||||
self._stats_thread.stop()
|
||||
self._stats_thread.join()
|
||||
|
||||
# Don't let stop() return until all pool threads have been
|
||||
# terminated.
|
||||
self.log.debug("Stopping pool threads")
|
||||
|
@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
|
|||
self, self.delete_interval)
|
||||
self._delete_thread.start()
|
||||
|
||||
if not self._stats_thread:
|
||||
self._stats_thread = StatsWorker(self, self.stats_interval)
|
||||
self._stats_thread.start()
|
||||
|
||||
# Stop any PoolWorker threads if the pool was removed
|
||||
# from the config.
|
||||
pool_keys = set()
|
||||
|
|
|
@ -85,39 +85,40 @@ class StatsReporter(object):
|
|||
pipeline.incr(key)
|
||||
pipeline.send()
|
||||
|
||||
def updateNodeStats(self, zk_conn, provider):
|
||||
def updateNodeStats(self, zk_conn):
|
||||
'''
|
||||
Refresh statistics for all known nodes.
|
||||
|
||||
:param ZooKeeper zk_conn: A ZooKeeper connection object.
|
||||
:param Provider provider: A config Provider object.
|
||||
'''
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
states = {}
|
||||
|
||||
launchers = zk_conn.getRegisteredLaunchers()
|
||||
labels = set()
|
||||
for launcher in launchers:
|
||||
labels.update(launcher.supported_labels)
|
||||
providers = set()
|
||||
for launcher in launchers:
|
||||
providers.add(launcher.provider_name)
|
||||
|
||||
# Initialize things we know about to zero
|
||||
for state in zk.Node.VALID_STATES:
|
||||
key = 'nodepool.nodes.%s' % state
|
||||
states[key] = 0
|
||||
key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state)
|
||||
states[key] = 0
|
||||
for provider in providers:
|
||||
key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
|
||||
states[key] = 0
|
||||
|
||||
# Initialize label stats to 0
|
||||
for label in provider.getSupportedLabels():
|
||||
for label in labels:
|
||||
for state in zk.Node.VALID_STATES:
|
||||
key = 'nodepool.label.%s.nodes.%s' % (label, state)
|
||||
states[key] = 0
|
||||
|
||||
# Note that we intentionally don't use caching here because we don't
|
||||
# know when the next update will happen and thus need to report the
|
||||
# correct most recent state. Otherwise we can end up in reporting
|
||||
# a gauge with a node in state deleting = 1 and never update this for
|
||||
# a long time.
|
||||
# TODO(tobiash): Changing updateNodeStats to just run periodically will
|
||||
# resolve this and we can operate on cached data.
|
||||
for node in zk_conn.nodeIterator(cached=False):
|
||||
for node in zk_conn.nodeIterator():
|
||||
# nodepool.nodes.STATE
|
||||
key = 'nodepool.nodes.%s' % node.state
|
||||
states[key] += 1
|
||||
|
@ -145,9 +146,18 @@ class StatsReporter(object):
|
|||
for key, count in states.items():
|
||||
pipeline.gauge(key, count)
|
||||
|
||||
pipeline.send()
|
||||
|
||||
def updateProviderLimits(self, provider):
|
||||
if not self._statsd:
|
||||
return
|
||||
|
||||
pipeline = self._statsd.pipeline()
|
||||
|
||||
# nodepool.provider.PROVIDER.max_servers
|
||||
key = 'nodepool.provider.%s.max_servers' % provider.name
|
||||
max_servers = sum([p.max_servers for p in provider.pools.values()
|
||||
if p.max_servers])
|
||||
pipeline.gauge(key, max_servers)
|
||||
|
||||
pipeline.send()
|
||||
|
|
|
@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
|
|||
'fake-provider3',
|
||||
'CleanupWorker',
|
||||
'DeletedNodeWorker',
|
||||
'StatsWorker',
|
||||
'pydevd.CommandThread',
|
||||
'pydevd.Reader',
|
||||
'pydevd.Writer',
|
||||
|
|
|
@ -23,6 +23,7 @@ from kazoo import exceptions as kze
|
|||
from kazoo.handlers.threading import KazooTimeoutError
|
||||
from kazoo.recipe.lock import Lock
|
||||
from kazoo.recipe.cache import TreeCache, TreeEvent
|
||||
from kazoo.recipe.election import Election
|
||||
|
||||
from nodepool import exceptions as npe
|
||||
|
||||
|
@ -164,6 +165,7 @@ class Launcher(Serializable):
|
|||
|
||||
def __init__(self):
|
||||
self.id = None
|
||||
self.provider_name = None
|
||||
self._supported_labels = set()
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -186,6 +188,7 @@ class Launcher(Serializable):
|
|||
def toDict(self):
|
||||
d = {}
|
||||
d['id'] = self.id
|
||||
d['provider_name'] = self.provider_name
|
||||
# sets are not JSON serializable, so use a sorted list
|
||||
d['supported_labels'] = sorted(self.supported_labels)
|
||||
return d
|
||||
|
@ -194,6 +197,10 @@ class Launcher(Serializable):
|
|||
def fromDict(d):
|
||||
obj = Launcher()
|
||||
obj.id = d.get('id')
|
||||
# TODO(tobiash): The fallback to 'unknown' is only needed to avoid
|
||||
# having a full nodepool shutdown on upgrade. It can be
|
||||
# removed later.
|
||||
obj.provider_name = d.get('provider_name', 'unknown')
|
||||
obj.supported_labels = set(d.get('supported_labels', []))
|
||||
return obj
|
||||
|
||||
|
@ -693,6 +700,7 @@ class ZooKeeper(object):
|
|||
NODE_ROOT = "/nodepool/nodes"
|
||||
REQUEST_ROOT = "/nodepool/requests"
|
||||
REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
|
||||
ELECTION_ROOT = "/nodepool/elections"
|
||||
|
||||
# Log zookeeper retry every 10 seconds
|
||||
retry_log_rate = 10
|
||||
|
@ -710,10 +718,15 @@ class ZooKeeper(object):
|
|||
self._cached_node_requests = {}
|
||||
self.enable_cache = enable_cache
|
||||
|
||||
self.node_stats_event = None
|
||||
|
||||
# =======================================================================
|
||||
# Private Methods
|
||||
# =======================================================================
|
||||
|
||||
def _electionPath(self, election):
|
||||
return "%s/%s" % (self.ELECTION_ROOT, election)
|
||||
|
||||
def _imagePath(self, image):
|
||||
return "%s/%s" % (self.IMAGE_ROOT, image)
|
||||
|
||||
|
@ -2106,6 +2119,10 @@ class ZooKeeper(object):
|
|||
node = Node.fromDict(d, node_id)
|
||||
node.stat = event.event_data.stat
|
||||
self._cached_nodes[node_id] = node
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_nodes[node_id]
|
||||
|
@ -2113,6 +2130,13 @@ class ZooKeeper(object):
|
|||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
|
||||
def setNodeStatsEvent(self, event):
|
||||
self.node_stats_event = event
|
||||
|
||||
def requestCacheListener(self, event):
|
||||
|
||||
if hasattr(event.event_data, 'path'):
|
||||
|
@ -2158,3 +2182,7 @@ class ZooKeeper(object):
|
|||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
def getStatsElection(self, identifier):
|
||||
path = self._electionPath('stats')
|
||||
return Election(self.client, path, identifier)
|
||||
|
|
Loading…
Reference in New Issue