summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTobias Henkel <tobias.henkel@bmw.de>2018-11-22 14:37:58 +0100
committerTobias Henkel <tobias.henkel@bmw.de>2018-11-29 16:48:30 +0100
commit64487baef0606f9035cd53a76ea32c86e133bd51 (patch)
tree27819002f9df1e000291aa189d2a33433f9d8b1f
parent9d77f05d8ebb9b5643af2debbef85fb972a633b8 (diff)
Asynchronously update node statistics
We currently updarte the node statistics on every node launch or delete. This cannot use caching at the moment because when the statistics are updated we might end up pushing slightly outdated data. If then there is no further update for a longer time we end up with broken gauges. We already get update events from the node cache so we can use that to centrally trigger node statistics updates. This is combined with leader election so there is only a single launcher that keeps the statistics up to date. This will ensure that the statistics are not cluttered because of several launchers pushing their own slightly different view into the stats. As a side effect this reduces the runtime of a test that creates 200 nodes from 100s to 70s on my local machine. Change-Id: I77c6edc1db45b5b45be1812cf19eea66fdfab014
Notes
Notes (review): Code-Review+2: James E. Blair <corvus@inaugust.com> Code-Review+2: David Shrewsbury <shrewsbury.dave@gmail.com> Workflow+1: David Shrewsbury <shrewsbury.dave@gmail.com> Verified+2: Zuul Submitted-by: Zuul Submitted-at: Thu, 29 Nov 2018 21:00:14 +0000 Reviewed-on: https://review.openstack.org/619589 Project: openstack-infra/nodepool Branch: refs/heads/master
-rw-r--r--nodepool/driver/utils.py1
-rwxr-xr-xnodepool/launcher.py89
-rwxr-xr-xnodepool/stats.py36
-rw-r--r--nodepool/tests/__init__.py1
-rwxr-xr-xnodepool/zk.py28
5 files changed, 133 insertions, 22 deletions
diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py
index e8c3e72..1c6cbb3 100644
--- a/nodepool/driver/utils.py
+++ b/nodepool/driver/utils.py
@@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
94 try: 94 try:
95 dt = int((time.monotonic() - start_time) * 1000) 95 dt = int((time.monotonic() - start_time) * 1000)
96 self.recordLaunchStats(statsd_key, dt) 96 self.recordLaunchStats(statsd_key, dt)
97 self.updateNodeStats(self.zk, self.provider_config)
98 except Exception: 97 except Exception:
99 self.log.exception("Exception while reporting stats:") 98 self.log.exception("Exception while reporting stats:")
100 99
diff --git a/nodepool/launcher.py b/nodepool/launcher.py
index 192651f..cabf5bf 100755
--- a/nodepool/launcher.py
+++ b/nodepool/launcher.py
@@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
46SUSPEND_WAIT_TIME = 30 46SUSPEND_WAIT_TIME = 30
47 47
48 48
49class NodeDeleter(threading.Thread, stats.StatsReporter): 49class NodeDeleter(threading.Thread):
50 log = logging.getLogger("nodepool.NodeDeleter") 50 log = logging.getLogger("nodepool.NodeDeleter")
51 51
52 def __init__(self, zk, provider_manager, node): 52 def __init__(self, zk, provider_manager, node):
53 threading.Thread.__init__(self, name='NodeDeleter for %s %s' % 53 threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
54 (node.provider, node.external_id)) 54 (node.provider, node.external_id))
55 stats.StatsReporter.__init__(self)
56 self._zk = zk 55 self._zk = zk
57 self._provider_manager = provider_manager 56 self._provider_manager = provider_manager
58 self._node = node 57 self._node = node
@@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
109 108
110 self.delete(self._zk, self._provider_manager, self._node, node_exists) 109 self.delete(self._zk, self._provider_manager, self._node, node_exists)
111 110
112 try:
113 self.updateNodeStats(self._zk, self._provider_manager.provider)
114 except Exception:
115 self.log.exception("Exception while reporting stats:")
116 111
117 112class PoolWorker(threading.Thread, stats.StatsReporter):
118class PoolWorker(threading.Thread):
119 ''' 113 '''
120 Class that manages node requests for a single provider pool. 114 Class that manages node requests for a single provider pool.
121 115
@@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
143 self.launcher_id = "%s-%s-%s" % (socket.gethostname(), 137 self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
144 os.getpid(), 138 os.getpid(),
145 self.name) 139 self.name)
140 stats.StatsReporter.__init__(self)
146 141
147 # --------------------------------------------------------------- 142 # ---------------------------------------------------------------
148 # Private methods 143 # Private methods
@@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
294 launcher.id = self.launcher_id 289 launcher.id = self.launcher_id
295 for prov_cfg in self.nodepool.config.providers.values(): 290 for prov_cfg in self.nodepool.config.providers.values():
296 launcher.supported_labels.update(prov_cfg.getSupportedLabels()) 291 launcher.supported_labels.update(prov_cfg.getSupportedLabels())
292 launcher.provider_name = self.provider_name
297 self.zk.registerLauncher(launcher) 293 self.zk.registerLauncher(launcher)
298 294
295 self.updateProviderLimits(
296 self.nodepool.config.providers.get(self.provider_name))
297
299 try: 298 try:
300 if not self.paused_handler: 299 if not self.paused_handler:
301 self._assignHandlers() 300 self._assignHandlers()
@@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
699 self.log.exception("Exception in DeletedNodeWorker:") 698 self.log.exception("Exception in DeletedNodeWorker:")
700 699
701 700
701class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
702
703 def __init__(self, nodepool, interval):
704 super().__init__(nodepool, interval, name='StatsWorker')
705 self.log = logging.getLogger('nodepool.StatsWorker')
706 self.stats_event = threading.Event()
707 self.election = None
708
709 def stop(self):
710 self._running = False
711 if self.election is not None:
712 self.log.debug('Cancel leader election')
713 self.election.cancel()
714 self.stats_event.set()
715 super().stop()
716
717 def _run(self):
718 try:
719 stats.StatsReporter.__init__(self)
720
721 if not self._statsd:
722 return
723
724 if self.election is None:
725 zk = self._nodepool.getZK()
726 identifier = "%s-%s" % (socket.gethostname(), os.getpid())
727 self.election = zk.getStatsElection(identifier)
728
729 if not self._running:
730 return
731
732 self.election.run(self._run_stats)
733
734 except Exception:
735 self.log.exception('Exception in StatsWorker:')
736
737 def _run_stats(self):
738 self.log.info('Won stats reporter election')
739
740 # enable us getting events
741 zk = self._nodepool.getZK()
742 zk.setNodeStatsEvent(self.stats_event)
743
744 while self._running:
745 signaled = self.stats_event.wait()
746
747 if not self._running:
748 break
749
750 if not signaled:
751 continue
752
753 self.log.debug('Updating stats')
754 self.stats_event.clear()
755 try:
756 self.updateNodeStats(zk)
757 except Exception:
758 self.log.exception("Exception while reporting stats:")
759 time.sleep(1)
760
761 # Unregister from node stats events
762 zk.setNodeStatsEvent(None)
763
764
702class NodePool(threading.Thread): 765class NodePool(threading.Thread):
703 log = logging.getLogger("nodepool.NodePool") 766 log = logging.getLogger("nodepool.NodePool")
704 767
@@ -710,6 +773,7 @@ class NodePool(threading.Thread):
710 self.watermark_sleep = watermark_sleep 773 self.watermark_sleep = watermark_sleep
711 self.cleanup_interval = 60 774 self.cleanup_interval = 60
712 self.delete_interval = 5 775 self.delete_interval = 5
776 self.stats_interval = 5
713 self._stopped = False 777 self._stopped = False
714 self._stop_event = threading.Event() 778 self._stop_event = threading.Event()
715 self.config = None 779 self.config = None
@@ -718,6 +782,7 @@ class NodePool(threading.Thread):
718 self._pool_threads = {} 782 self._pool_threads = {}
719 self._cleanup_thread = None 783 self._cleanup_thread = None
720 self._delete_thread = None 784 self._delete_thread = None
785 self._stats_thread = None
721 self._submittedRequests = {} 786 self._submittedRequests = {}
722 787
723 def stop(self): 788 def stop(self):
@@ -738,6 +803,10 @@ class NodePool(threading.Thread):
738 self._delete_thread.stop() 803 self._delete_thread.stop()
739 self._delete_thread.join() 804 self._delete_thread.join()
740 805
806 if self._stats_thread:
807 self._stats_thread.stop()
808 self._stats_thread.join()
809
741 # Don't let stop() return until all pool threads have been 810 # Don't let stop() return until all pool threads have been
742 # terminated. 811 # terminated.
743 self.log.debug("Stopping pool threads") 812 self.log.debug("Stopping pool threads")
@@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
950 self, self.delete_interval) 1019 self, self.delete_interval)
951 self._delete_thread.start() 1020 self._delete_thread.start()
952 1021
1022 if not self._stats_thread:
1023 self._stats_thread = StatsWorker(self, self.stats_interval)
1024 self._stats_thread.start()
1025
953 # Stop any PoolWorker threads if the pool was removed 1026 # Stop any PoolWorker threads if the pool was removed
954 # from the config. 1027 # from the config.
955 pool_keys = set() 1028 pool_keys = set()
diff --git a/nodepool/stats.py b/nodepool/stats.py
index 7cbf32e..c218633 100755
--- a/nodepool/stats.py
+++ b/nodepool/stats.py
@@ -85,39 +85,40 @@ class StatsReporter(object):
85 pipeline.incr(key) 85 pipeline.incr(key)
86 pipeline.send() 86 pipeline.send()
87 87
88 def updateNodeStats(self, zk_conn, provider): 88 def updateNodeStats(self, zk_conn):
89 ''' 89 '''
90 Refresh statistics for all known nodes. 90 Refresh statistics for all known nodes.
91 91
92 :param ZooKeeper zk_conn: A ZooKeeper connection object. 92 :param ZooKeeper zk_conn: A ZooKeeper connection object.
93 :param Provider provider: A config Provider object.
94 ''' 93 '''
95 if not self._statsd: 94 if not self._statsd:
96 return 95 return
97 96
98 states = {} 97 states = {}
99 98
99 launchers = zk_conn.getRegisteredLaunchers()
100 labels = set()
101 for launcher in launchers:
102 labels.update(launcher.supported_labels)
103 providers = set()
104 for launcher in launchers:
105 providers.add(launcher.provider_name)
106
100 # Initialize things we know about to zero 107 # Initialize things we know about to zero
101 for state in zk.Node.VALID_STATES: 108 for state in zk.Node.VALID_STATES:
102 key = 'nodepool.nodes.%s' % state 109 key = 'nodepool.nodes.%s' % state
103 states[key] = 0 110 states[key] = 0
104 key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state) 111 for provider in providers:
105 states[key] = 0 112 key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
113 states[key] = 0
106 114
107 # Initialize label stats to 0 115 # Initialize label stats to 0
108 for label in provider.getSupportedLabels(): 116 for label in labels:
109 for state in zk.Node.VALID_STATES: 117 for state in zk.Node.VALID_STATES:
110 key = 'nodepool.label.%s.nodes.%s' % (label, state) 118 key = 'nodepool.label.%s.nodes.%s' % (label, state)
111 states[key] = 0 119 states[key] = 0
112 120
113 # Note that we intentionally don't use caching here because we don't 121 for node in zk_conn.nodeIterator():
114 # know when the next update will happen and thus need to report the
115 # correct most recent state. Otherwise we can end up in reporting
116 # a gauge with a node in state deleting = 1 and never update this for
117 # a long time.
118 # TODO(tobiash): Changing updateNodeStats to just run periodically will
119 # resolve this and we can operate on cached data.
120 for node in zk_conn.nodeIterator(cached=False):
121 # nodepool.nodes.STATE 122 # nodepool.nodes.STATE
122 key = 'nodepool.nodes.%s' % node.state 123 key = 'nodepool.nodes.%s' % node.state
123 states[key] += 1 124 states[key] += 1
@@ -145,9 +146,18 @@ class StatsReporter(object):
145 for key, count in states.items(): 146 for key, count in states.items():
146 pipeline.gauge(key, count) 147 pipeline.gauge(key, count)
147 148
149 pipeline.send()
150
151 def updateProviderLimits(self, provider):
152 if not self._statsd:
153 return
154
155 pipeline = self._statsd.pipeline()
156
148 # nodepool.provider.PROVIDER.max_servers 157 # nodepool.provider.PROVIDER.max_servers
149 key = 'nodepool.provider.%s.max_servers' % provider.name 158 key = 'nodepool.provider.%s.max_servers' % provider.name
150 max_servers = sum([p.max_servers for p in provider.pools.values() 159 max_servers = sum([p.max_servers for p in provider.pools.values()
151 if p.max_servers]) 160 if p.max_servers])
152 pipeline.gauge(key, max_servers) 161 pipeline.gauge(key, max_servers)
162
153 pipeline.send() 163 pipeline.send()
diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py
index 1ab83ac..4a8e83d 100644
--- a/nodepool/tests/__init__.py
+++ b/nodepool/tests/__init__.py
@@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
207 'fake-provider3', 207 'fake-provider3',
208 'CleanupWorker', 208 'CleanupWorker',
209 'DeletedNodeWorker', 209 'DeletedNodeWorker',
210 'StatsWorker',
210 'pydevd.CommandThread', 211 'pydevd.CommandThread',
211 'pydevd.Reader', 212 'pydevd.Reader',
212 'pydevd.Writer', 213 'pydevd.Writer',
diff --git a/nodepool/zk.py b/nodepool/zk.py
index 0bcd190..beb7709 100755
--- a/nodepool/zk.py
+++ b/nodepool/zk.py
@@ -23,6 +23,7 @@ from kazoo import exceptions as kze
23from kazoo.handlers.threading import KazooTimeoutError 23from kazoo.handlers.threading import KazooTimeoutError
24from kazoo.recipe.lock import Lock 24from kazoo.recipe.lock import Lock
25from kazoo.recipe.cache import TreeCache, TreeEvent 25from kazoo.recipe.cache import TreeCache, TreeEvent
26from kazoo.recipe.election import Election
26 27
27from nodepool import exceptions as npe 28from nodepool import exceptions as npe
28 29
@@ -164,6 +165,7 @@ class Launcher(Serializable):
164 165
165 def __init__(self): 166 def __init__(self):
166 self.id = None 167 self.id = None
168 self.provider_name = None
167 self._supported_labels = set() 169 self._supported_labels = set()
168 170
169 def __eq__(self, other): 171 def __eq__(self, other):
@@ -186,6 +188,7 @@ class Launcher(Serializable):
186 def toDict(self): 188 def toDict(self):
187 d = {} 189 d = {}
188 d['id'] = self.id 190 d['id'] = self.id
191 d['provider_name'] = self.provider_name
189 # sets are not JSON serializable, so use a sorted list 192 # sets are not JSON serializable, so use a sorted list
190 d['supported_labels'] = sorted(self.supported_labels) 193 d['supported_labels'] = sorted(self.supported_labels)
191 return d 194 return d
@@ -194,6 +197,10 @@ class Launcher(Serializable):
194 def fromDict(d): 197 def fromDict(d):
195 obj = Launcher() 198 obj = Launcher()
196 obj.id = d.get('id') 199 obj.id = d.get('id')
200 # TODO(tobiash): The fallback to 'unknown' is only needed to avoid
201 # having a full nodepool shutdown on upgrade. It can be
202 # removed later.
203 obj.provider_name = d.get('provider_name', 'unknown')
197 obj.supported_labels = set(d.get('supported_labels', [])) 204 obj.supported_labels = set(d.get('supported_labels', []))
198 return obj 205 return obj
199 206
@@ -689,6 +696,7 @@ class ZooKeeper(object):
689 NODE_ROOT = "/nodepool/nodes" 696 NODE_ROOT = "/nodepool/nodes"
690 REQUEST_ROOT = "/nodepool/requests" 697 REQUEST_ROOT = "/nodepool/requests"
691 REQUEST_LOCK_ROOT = "/nodepool/requests-lock" 698 REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
699 ELECTION_ROOT = "/nodepool/elections"
692 700
693 # Log zookeeper retry every 10 seconds 701 # Log zookeeper retry every 10 seconds
694 retry_log_rate = 10 702 retry_log_rate = 10
@@ -706,10 +714,15 @@ class ZooKeeper(object):
706 self._cached_node_requests = {} 714 self._cached_node_requests = {}
707 self.enable_cache = enable_cache 715 self.enable_cache = enable_cache
708 716
717 self.node_stats_event = None
718
709 # ======================================================================= 719 # =======================================================================
710 # Private Methods 720 # Private Methods
711 # ======================================================================= 721 # =======================================================================
712 722
723 def _electionPath(self, election):
724 return "%s/%s" % (self.ELECTION_ROOT, election)
725
713 def _imagePath(self, image): 726 def _imagePath(self, image):
714 return "%s/%s" % (self.IMAGE_ROOT, image) 727 return "%s/%s" % (self.IMAGE_ROOT, image)
715 728
@@ -2102,6 +2115,10 @@ class ZooKeeper(object):
2102 node = Node.fromDict(d, node_id) 2115 node = Node.fromDict(d, node_id)
2103 node.stat = event.event_data.stat 2116 node.stat = event.event_data.stat
2104 self._cached_nodes[node_id] = node 2117 self._cached_nodes[node_id] = node
2118
2119 # set the stats event so the stats reporting thread can act upon it
2120 if self.node_stats_event is not None:
2121 self.node_stats_event.set()
2105 elif event.event_type == TreeEvent.NODE_REMOVED: 2122 elif event.event_type == TreeEvent.NODE_REMOVED:
2106 try: 2123 try:
2107 del self._cached_nodes[node_id] 2124 del self._cached_nodes[node_id]
@@ -2109,6 +2126,13 @@ class ZooKeeper(object):
2109 # If it's already gone, don't care 2126 # If it's already gone, don't care
2110 pass 2127 pass
2111 2128
2129 # set the stats event so the stats reporting thread can act upon it
2130 if self.node_stats_event is not None:
2131 self.node_stats_event.set()
2132
2133 def setNodeStatsEvent(self, event):
2134 self.node_stats_event = event
2135
2112 def requestCacheListener(self, event): 2136 def requestCacheListener(self, event):
2113 2137
2114 if hasattr(event.event_data, 'path'): 2138 if hasattr(event.event_data, 'path'):
@@ -2154,3 +2178,7 @@ class ZooKeeper(object):
2154 except KeyError: 2178 except KeyError:
2155 # If it's already gone, don't care 2179 # If it's already gone, don't care
2156 pass 2180 pass
2181
2182 def getStatsElection(self, identifier):
2183 path = self._electionPath('stats')
2184 return Election(self.client, path, identifier)