summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-11-29 21:00:14 +0000
committerGerrit Code Review <review@openstack.org>2018-11-29 21:00:14 +0000
commit81936cd818bbf5cf8084160c112e4a1a5a5aa03b (patch)
tree9d90a7a4825868ecf285678a479d810439a42d02
parent5f2281a59e340b32472d78bcfaebe1886c8e479b (diff)
parent64487baef0606f9035cd53a76ea32c86e133bd51 (diff)
Merge "Asynchronously update node statistics"
-rw-r--r--nodepool/driver/utils.py1
-rwxr-xr-xnodepool/launcher.py89
-rwxr-xr-xnodepool/stats.py36
-rw-r--r--nodepool/tests/__init__.py1
-rwxr-xr-xnodepool/zk.py28
5 files changed, 133 insertions, 22 deletions
diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py
index e8c3e72..1c6cbb3 100644
--- a/nodepool/driver/utils.py
+++ b/nodepool/driver/utils.py
@@ -94,7 +94,6 @@ class NodeLauncher(threading.Thread,
94 try: 94 try:
95 dt = int((time.monotonic() - start_time) * 1000) 95 dt = int((time.monotonic() - start_time) * 1000)
96 self.recordLaunchStats(statsd_key, dt) 96 self.recordLaunchStats(statsd_key, dt)
97 self.updateNodeStats(self.zk, self.provider_config)
98 except Exception: 97 except Exception:
99 self.log.exception("Exception while reporting stats:") 98 self.log.exception("Exception while reporting stats:")
100 99
diff --git a/nodepool/launcher.py b/nodepool/launcher.py
index 192651f..cabf5bf 100755
--- a/nodepool/launcher.py
+++ b/nodepool/launcher.py
@@ -46,13 +46,12 @@ LOCK_CLEANUP = 8 * HOURS
46SUSPEND_WAIT_TIME = 30 46SUSPEND_WAIT_TIME = 30
47 47
48 48
49class NodeDeleter(threading.Thread, stats.StatsReporter): 49class NodeDeleter(threading.Thread):
50 log = logging.getLogger("nodepool.NodeDeleter") 50 log = logging.getLogger("nodepool.NodeDeleter")
51 51
52 def __init__(self, zk, provider_manager, node): 52 def __init__(self, zk, provider_manager, node):
53 threading.Thread.__init__(self, name='NodeDeleter for %s %s' % 53 threading.Thread.__init__(self, name='NodeDeleter for %s %s' %
54 (node.provider, node.external_id)) 54 (node.provider, node.external_id))
55 stats.StatsReporter.__init__(self)
56 self._zk = zk 55 self._zk = zk
57 self._provider_manager = provider_manager 56 self._provider_manager = provider_manager
58 self._node = node 57 self._node = node
@@ -109,13 +108,8 @@ class NodeDeleter(threading.Thread, stats.StatsReporter):
109 108
110 self.delete(self._zk, self._provider_manager, self._node, node_exists) 109 self.delete(self._zk, self._provider_manager, self._node, node_exists)
111 110
112 try:
113 self.updateNodeStats(self._zk, self._provider_manager.provider)
114 except Exception:
115 self.log.exception("Exception while reporting stats:")
116 111
117 112class PoolWorker(threading.Thread, stats.StatsReporter):
118class PoolWorker(threading.Thread):
119 ''' 113 '''
120 Class that manages node requests for a single provider pool. 114 Class that manages node requests for a single provider pool.
121 115
@@ -143,6 +137,7 @@ class PoolWorker(threading.Thread):
143 self.launcher_id = "%s-%s-%s" % (socket.gethostname(), 137 self.launcher_id = "%s-%s-%s" % (socket.gethostname(),
144 os.getpid(), 138 os.getpid(),
145 self.name) 139 self.name)
140 stats.StatsReporter.__init__(self)
146 141
147 # --------------------------------------------------------------- 142 # ---------------------------------------------------------------
148 # Private methods 143 # Private methods
@@ -294,8 +289,12 @@ class PoolWorker(threading.Thread):
294 launcher.id = self.launcher_id 289 launcher.id = self.launcher_id
295 for prov_cfg in self.nodepool.config.providers.values(): 290 for prov_cfg in self.nodepool.config.providers.values():
296 launcher.supported_labels.update(prov_cfg.getSupportedLabels()) 291 launcher.supported_labels.update(prov_cfg.getSupportedLabels())
292 launcher.provider_name = self.provider_name
297 self.zk.registerLauncher(launcher) 293 self.zk.registerLauncher(launcher)
298 294
295 self.updateProviderLimits(
296 self.nodepool.config.providers.get(self.provider_name))
297
299 try: 298 try:
300 if not self.paused_handler: 299 if not self.paused_handler:
301 self._assignHandlers() 300 self._assignHandlers()
@@ -699,6 +698,70 @@ class DeletedNodeWorker(BaseCleanupWorker):
699 self.log.exception("Exception in DeletedNodeWorker:") 698 self.log.exception("Exception in DeletedNodeWorker:")
700 699
701 700
701class StatsWorker(BaseCleanupWorker, stats.StatsReporter):
702
703 def __init__(self, nodepool, interval):
704 super().__init__(nodepool, interval, name='StatsWorker')
705 self.log = logging.getLogger('nodepool.StatsWorker')
706 self.stats_event = threading.Event()
707 self.election = None
708
709 def stop(self):
710 self._running = False
711 if self.election is not None:
712 self.log.debug('Cancel leader election')
713 self.election.cancel()
714 self.stats_event.set()
715 super().stop()
716
717 def _run(self):
718 try:
719 stats.StatsReporter.__init__(self)
720
721 if not self._statsd:
722 return
723
724 if self.election is None:
725 zk = self._nodepool.getZK()
726 identifier = "%s-%s" % (socket.gethostname(), os.getpid())
727 self.election = zk.getStatsElection(identifier)
728
729 if not self._running:
730 return
731
732 self.election.run(self._run_stats)
733
734 except Exception:
735 self.log.exception('Exception in StatsWorker:')
736
737 def _run_stats(self):
738 self.log.info('Won stats reporter election')
739
740 # enable us getting events
741 zk = self._nodepool.getZK()
742 zk.setNodeStatsEvent(self.stats_event)
743
744 while self._running:
745 signaled = self.stats_event.wait()
746
747 if not self._running:
748 break
749
750 if not signaled:
751 continue
752
753 self.log.debug('Updating stats')
754 self.stats_event.clear()
755 try:
756 self.updateNodeStats(zk)
757 except Exception:
758 self.log.exception("Exception while reporting stats:")
759 time.sleep(1)
760
761 # Unregister from node stats events
762 zk.setNodeStatsEvent(None)
763
764
702class NodePool(threading.Thread): 765class NodePool(threading.Thread):
703 log = logging.getLogger("nodepool.NodePool") 766 log = logging.getLogger("nodepool.NodePool")
704 767
@@ -710,6 +773,7 @@ class NodePool(threading.Thread):
710 self.watermark_sleep = watermark_sleep 773 self.watermark_sleep = watermark_sleep
711 self.cleanup_interval = 60 774 self.cleanup_interval = 60
712 self.delete_interval = 5 775 self.delete_interval = 5
776 self.stats_interval = 5
713 self._stopped = False 777 self._stopped = False
714 self._stop_event = threading.Event() 778 self._stop_event = threading.Event()
715 self.config = None 779 self.config = None
@@ -718,6 +782,7 @@ class NodePool(threading.Thread):
718 self._pool_threads = {} 782 self._pool_threads = {}
719 self._cleanup_thread = None 783 self._cleanup_thread = None
720 self._delete_thread = None 784 self._delete_thread = None
785 self._stats_thread = None
721 self._submittedRequests = {} 786 self._submittedRequests = {}
722 787
723 def stop(self): 788 def stop(self):
@@ -738,6 +803,10 @@ class NodePool(threading.Thread):
738 self._delete_thread.stop() 803 self._delete_thread.stop()
739 self._delete_thread.join() 804 self._delete_thread.join()
740 805
806 if self._stats_thread:
807 self._stats_thread.stop()
808 self._stats_thread.join()
809
741 # Don't let stop() return until all pool threads have been 810 # Don't let stop() return until all pool threads have been
742 # terminated. 811 # terminated.
743 self.log.debug("Stopping pool threads") 812 self.log.debug("Stopping pool threads")
@@ -950,6 +1019,10 @@ class NodePool(threading.Thread):
950 self, self.delete_interval) 1019 self, self.delete_interval)
951 self._delete_thread.start() 1020 self._delete_thread.start()
952 1021
1022 if not self._stats_thread:
1023 self._stats_thread = StatsWorker(self, self.stats_interval)
1024 self._stats_thread.start()
1025
953 # Stop any PoolWorker threads if the pool was removed 1026 # Stop any PoolWorker threads if the pool was removed
954 # from the config. 1027 # from the config.
955 pool_keys = set() 1028 pool_keys = set()
diff --git a/nodepool/stats.py b/nodepool/stats.py
index 7cbf32e..c218633 100755
--- a/nodepool/stats.py
+++ b/nodepool/stats.py
@@ -85,39 +85,40 @@ class StatsReporter(object):
85 pipeline.incr(key) 85 pipeline.incr(key)
86 pipeline.send() 86 pipeline.send()
87 87
88 def updateNodeStats(self, zk_conn, provider): 88 def updateNodeStats(self, zk_conn):
89 ''' 89 '''
90 Refresh statistics for all known nodes. 90 Refresh statistics for all known nodes.
91 91
92 :param ZooKeeper zk_conn: A ZooKeeper connection object. 92 :param ZooKeeper zk_conn: A ZooKeeper connection object.
93 :param Provider provider: A config Provider object.
94 ''' 93 '''
95 if not self._statsd: 94 if not self._statsd:
96 return 95 return
97 96
98 states = {} 97 states = {}
99 98
99 launchers = zk_conn.getRegisteredLaunchers()
100 labels = set()
101 for launcher in launchers:
102 labels.update(launcher.supported_labels)
103 providers = set()
104 for launcher in launchers:
105 providers.add(launcher.provider_name)
106
100 # Initialize things we know about to zero 107 # Initialize things we know about to zero
101 for state in zk.Node.VALID_STATES: 108 for state in zk.Node.VALID_STATES:
102 key = 'nodepool.nodes.%s' % state 109 key = 'nodepool.nodes.%s' % state
103 states[key] = 0 110 states[key] = 0
104 key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state) 111 for provider in providers:
105 states[key] = 0 112 key = 'nodepool.provider.%s.nodes.%s' % (provider, state)
113 states[key] = 0
106 114
107 # Initialize label stats to 0 115 # Initialize label stats to 0
108 for label in provider.getSupportedLabels(): 116 for label in labels:
109 for state in zk.Node.VALID_STATES: 117 for state in zk.Node.VALID_STATES:
110 key = 'nodepool.label.%s.nodes.%s' % (label, state) 118 key = 'nodepool.label.%s.nodes.%s' % (label, state)
111 states[key] = 0 119 states[key] = 0
112 120
113 # Note that we intentionally don't use caching here because we don't 121 for node in zk_conn.nodeIterator():
114 # know when the next update will happen and thus need to report the
115 # correct most recent state. Otherwise we can end up in reporting
116 # a gauge with a node in state deleting = 1 and never update this for
117 # a long time.
118 # TODO(tobiash): Changing updateNodeStats to just run periodically will
119 # resolve this and we can operate on cached data.
120 for node in zk_conn.nodeIterator(cached=False):
121 # nodepool.nodes.STATE 122 # nodepool.nodes.STATE
122 key = 'nodepool.nodes.%s' % node.state 123 key = 'nodepool.nodes.%s' % node.state
123 states[key] += 1 124 states[key] += 1
@@ -145,9 +146,18 @@ class StatsReporter(object):
145 for key, count in states.items(): 146 for key, count in states.items():
146 pipeline.gauge(key, count) 147 pipeline.gauge(key, count)
147 148
149 pipeline.send()
150
151 def updateProviderLimits(self, provider):
152 if not self._statsd:
153 return
154
155 pipeline = self._statsd.pipeline()
156
148 # nodepool.provider.PROVIDER.max_servers 157 # nodepool.provider.PROVIDER.max_servers
149 key = 'nodepool.provider.%s.max_servers' % provider.name 158 key = 'nodepool.provider.%s.max_servers' % provider.name
150 max_servers = sum([p.max_servers for p in provider.pools.values() 159 max_servers = sum([p.max_servers for p in provider.pools.values()
151 if p.max_servers]) 160 if p.max_servers])
152 pipeline.gauge(key, max_servers) 161 pipeline.gauge(key, max_servers)
162
153 pipeline.send() 163 pipeline.send()
diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py
index 1ab83ac..4a8e83d 100644
--- a/nodepool/tests/__init__.py
+++ b/nodepool/tests/__init__.py
@@ -207,6 +207,7 @@ class BaseTestCase(testtools.TestCase):
207 'fake-provider3', 207 'fake-provider3',
208 'CleanupWorker', 208 'CleanupWorker',
209 'DeletedNodeWorker', 209 'DeletedNodeWorker',
210 'StatsWorker',
210 'pydevd.CommandThread', 211 'pydevd.CommandThread',
211 'pydevd.Reader', 212 'pydevd.Reader',
212 'pydevd.Writer', 213 'pydevd.Writer',
diff --git a/nodepool/zk.py b/nodepool/zk.py
index daaf4f6..da850ad 100755
--- a/nodepool/zk.py
+++ b/nodepool/zk.py
@@ -23,6 +23,7 @@ from kazoo import exceptions as kze
23from kazoo.handlers.threading import KazooTimeoutError 23from kazoo.handlers.threading import KazooTimeoutError
24from kazoo.recipe.lock import Lock 24from kazoo.recipe.lock import Lock
25from kazoo.recipe.cache import TreeCache, TreeEvent 25from kazoo.recipe.cache import TreeCache, TreeEvent
26from kazoo.recipe.election import Election
26 27
27from nodepool import exceptions as npe 28from nodepool import exceptions as npe
28 29
@@ -164,6 +165,7 @@ class Launcher(Serializable):
164 165
165 def __init__(self): 166 def __init__(self):
166 self.id = None 167 self.id = None
168 self.provider_name = None
167 self._supported_labels = set() 169 self._supported_labels = set()
168 170
169 def __eq__(self, other): 171 def __eq__(self, other):
@@ -186,6 +188,7 @@ class Launcher(Serializable):
186 def toDict(self): 188 def toDict(self):
187 d = {} 189 d = {}
188 d['id'] = self.id 190 d['id'] = self.id
191 d['provider_name'] = self.provider_name
189 # sets are not JSON serializable, so use a sorted list 192 # sets are not JSON serializable, so use a sorted list
190 d['supported_labels'] = sorted(self.supported_labels) 193 d['supported_labels'] = sorted(self.supported_labels)
191 return d 194 return d
@@ -194,6 +197,10 @@ class Launcher(Serializable):
194 def fromDict(d): 197 def fromDict(d):
195 obj = Launcher() 198 obj = Launcher()
196 obj.id = d.get('id') 199 obj.id = d.get('id')
200 # TODO(tobiash): The fallback to 'unknown' is only needed to avoid
201 # having a full nodepool shutdown on upgrade. It can be
202 # removed later.
203 obj.provider_name = d.get('provider_name', 'unknown')
197 obj.supported_labels = set(d.get('supported_labels', [])) 204 obj.supported_labels = set(d.get('supported_labels', []))
198 return obj 205 return obj
199 206
@@ -693,6 +700,7 @@ class ZooKeeper(object):
693 NODE_ROOT = "/nodepool/nodes" 700 NODE_ROOT = "/nodepool/nodes"
694 REQUEST_ROOT = "/nodepool/requests" 701 REQUEST_ROOT = "/nodepool/requests"
695 REQUEST_LOCK_ROOT = "/nodepool/requests-lock" 702 REQUEST_LOCK_ROOT = "/nodepool/requests-lock"
703 ELECTION_ROOT = "/nodepool/elections"
696 704
697 # Log zookeeper retry every 10 seconds 705 # Log zookeeper retry every 10 seconds
698 retry_log_rate = 10 706 retry_log_rate = 10
@@ -710,10 +718,15 @@ class ZooKeeper(object):
710 self._cached_node_requests = {} 718 self._cached_node_requests = {}
711 self.enable_cache = enable_cache 719 self.enable_cache = enable_cache
712 720
721 self.node_stats_event = None
722
713 # ======================================================================= 723 # =======================================================================
714 # Private Methods 724 # Private Methods
715 # ======================================================================= 725 # =======================================================================
716 726
727 def _electionPath(self, election):
728 return "%s/%s" % (self.ELECTION_ROOT, election)
729
717 def _imagePath(self, image): 730 def _imagePath(self, image):
718 return "%s/%s" % (self.IMAGE_ROOT, image) 731 return "%s/%s" % (self.IMAGE_ROOT, image)
719 732
@@ -2106,6 +2119,10 @@ class ZooKeeper(object):
2106 node = Node.fromDict(d, node_id) 2119 node = Node.fromDict(d, node_id)
2107 node.stat = event.event_data.stat 2120 node.stat = event.event_data.stat
2108 self._cached_nodes[node_id] = node 2121 self._cached_nodes[node_id] = node
2122
2123 # set the stats event so the stats reporting thread can act upon it
2124 if self.node_stats_event is not None:
2125 self.node_stats_event.set()
2109 elif event.event_type == TreeEvent.NODE_REMOVED: 2126 elif event.event_type == TreeEvent.NODE_REMOVED:
2110 try: 2127 try:
2111 del self._cached_nodes[node_id] 2128 del self._cached_nodes[node_id]
@@ -2113,6 +2130,13 @@ class ZooKeeper(object):
2113 # If it's already gone, don't care 2130 # If it's already gone, don't care
2114 pass 2131 pass
2115 2132
2133 # set the stats event so the stats reporting thread can act upon it
2134 if self.node_stats_event is not None:
2135 self.node_stats_event.set()
2136
2137 def setNodeStatsEvent(self, event):
2138 self.node_stats_event = event
2139
2116 def requestCacheListener(self, event): 2140 def requestCacheListener(self, event):
2117 2141
2118 if hasattr(event.event_data, 'path'): 2142 if hasattr(event.event_data, 'path'):
@@ -2158,3 +2182,7 @@ class ZooKeeper(object):
2158 except KeyError: 2182 except KeyError:
2159 # If it's already gone, don't care 2183 # If it's already gone, don't care
2160 pass 2184 pass
2185
2186 def getStatsElection(self, identifier):
2187 path = self._electionPath('stats')
2188 return Election(self.client, path, identifier)