Remove TaskManager and just use keystoneauth

Support for concurrency and rate limiting has been added to keystoneauth,
which is the library openstacksdk uses to talk to OpenStack. Instead
of managing concurrency in nodepool using the TaskManager and pool of
worker threads, let keystoneauth take over. This also means we no longer
have a hook into the request process, so we defer statsd reporting to
the openstacksdk layer as well.

Change-Id: If21a10c56f43a121d30aa802f2c89d31df97f121
This commit is contained in:
Monty Taylor 2019-03-03 15:26:49 +00:00
parent f733e9c4d4
commit 34aae137fa
6 changed files with 34 additions and 136 deletions

View File

@ -517,12 +517,11 @@ OpenStack API stats
~~~~~~~~~~~~~~~~~~~
Low level details on the timing of OpenStack API calls will be logged
by the API task manager. These calls are logged under
by ``openstacksdk``. These calls are logged under
``nodepool.task.<provider>.<api-call>``. The API call name is of the
generic format ``<endpoint><method><operation>`` transformed into a
CamelCase value with no deliminators; for example the
``compute.GET.servers`` call becomes ``ComputeGetServers`` and
``compute.POST.os-volumes_boot`` becomes ``ComputePostOsVolumesBoot``.
generic format ``<service-type>.<method>.<operation>``. For example, the
``GET /servers`` call to the ``compute`` service becomes
``compute.GET.servers``.
Since these calls reflect the internal operations of the
``openstacksdk``, the exact keys logged may vary across providers and

View File

@ -17,6 +17,7 @@
import copy
import logging
import operator
import os
import time
import openstack
@ -25,7 +26,6 @@ from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.driver.utils import QuotaInformation
from nodepool.nodeutils import iterate_timeout
from nodepool.task_manager import TaskManager
from nodepool import stats
from nodepool import version
from nodepool import zk
@ -47,8 +47,6 @@ class OpenStackProvider(Provider):
self._networks = {}
self.__flavors = {} # TODO(gtema): caching
self.__azs = None
self._use_taskmanager = use_taskmanager
self._taskmanager = None
self._current_nodepool_quota = None
self._zk = None
self._down_ports = set()
@ -57,20 +55,14 @@ class OpenStackProvider(Provider):
self._statsd = stats.get_client()
def start(self, zk_conn):
if self._use_taskmanager:
self._taskmanager = TaskManager(self.provider.name,
self.provider.rate)
self._taskmanager.start()
self.resetClient()
self._zk = zk_conn
def stop(self):
if self._taskmanager:
self._taskmanager.stop()
pass
def join(self):
if self._taskmanager:
self._taskmanager.join()
pass
def getRequestHandler(self, poolworker, request):
return handler.OpenStackNodeRequestHandler(poolworker, request)
@ -83,13 +75,18 @@ class OpenStackProvider(Provider):
return self.__flavors
def _getClient(self):
if self._use_taskmanager:
manager = self._taskmanager
else:
manager = None
rate_limit = None
# nodepool tracks rate limit in time between requests.
# openstacksdk tracks rate limit in requests per second.
# 1/time = requests-per-second.
if self.provider.rate:
rate_limit = 1 / self.provider.rate
return openstack.connection.Connection(
config=self.provider.cloud_config,
task_manager=manager,
rate_limit=rate_limit,
statsd_host=os.getenv('STATSD_HOST', None),
statsd_port=os.getenv('STATSD_PORT ', None),
statsd_prefix='nodepool.task.{0}'.format(self.provider.name),
app_name='nodepool',
app_version=version.version_info.version_string()
)

View File

@ -1,99 +0,0 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import logging
import re
import queue
import time
from openstack import task_manager as openstack_task_manager
from nodepool import stats
def _transform_task_name(task_name):
# Transform openstacksdk internal task name to something more
# suitable for sending to statsd for tracking; e.g.
#
# compute.DELETE.servers -> ComputeDeleteServers
# compute.POST.os-volumes_boot -> ComputePostOsVolumesBoot
parts = re.split('[.\-_]', task_name)
return "".join(
[part.lower().capitalize() for part in parts]
)
class TaskManager(openstack_task_manager.TaskManager):
log = logging.getLogger("nodepool.TaskManager")
def __init__(self, name, rate, workers=5):
super(TaskManager, self).__init__(name=name, workers=workers)
self.daemon = True
self.queue = queue.Queue()
self._running = True
self.rate = float(rate)
self.statsd = stats.get_client()
self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True
def start(self):
self._thread.start()
def stop(self):
self._running = False
self.queue.put(None)
def join(self):
self._thread.join()
def run(self):
last_ts = 0
try:
while True:
task = self.queue.get()
if not task:
if not self._running:
break
continue
while True:
delta = time.time() - last_ts
if delta >= self.rate:
break
time.sleep(self.rate - delta)
self.log.debug("Manager %s running task %s (queue %s)" %
(self.name,
_transform_task_name(task.name),
self.queue.qsize()))
self.run_task(task)
self.queue.task_done()
except Exception:
self.log.exception("Task manager died.")
raise
def post_run_task(self, elapsed_time, task):
task_name = _transform_task_name(task.name)
self.log.debug(
"Manager %s ran task %s in %ss" %
(self.name, task_name, elapsed_time))
if self.statsd:
# nodepool.task.PROVIDER.TASK_NAME
key = 'nodepool.task.%s.%s' % (self.name, task_name)
self.statsd.timing(key, int(elapsed_time * 1000))
self.statsd.incr(key)

View File

@ -20,23 +20,10 @@ import yaml
from nodepool import config as nodepool_config
from nodepool import provider_manager
from nodepool import task_manager
from nodepool import tests
class TestShadeIntegration(tests.IntegrationTestCase):
def test_task_name_transformation(self):
t = task_manager._transform_task_name
self.assertEqual(
t('compute.DELETE.servers'),
'ComputeDeleteServers')
self.assertEqual(
t('compute.POST.os-volumes_boot'),
'ComputePostOsVolumesBoot')
self.assertEqual(
t('compute.GET.os-availability-zone'),
'ComputeGetOsAvailabilityZone')
def _cleanup_cloud_config(self):
os.remove(self.clouds_path)

View File

@ -0,0 +1,15 @@
---
upgrade:
- |
The ``TaskManager`` used by the OpenStack provider has been removed.
The ``keystoneauth1`` library underneath ``openstacksdk`` has grown
support for rate limiting using a ``FairSemaphore`` instead of a pool
of worker threads. This should reduce the overall thread count.
- |
statsd key names have changed. Because of the removal of ``TaskManager``
statsd calls are being deferred to openstacksdk. Instead of keys of the
form ``ComputeGetServers``, the openstacksdk keys are of the form
``compute.GET.servers``. They will always start with the normalized
``service-type``, followed by the HTTP verb, followed by a ``.`` separated
list of url segments. Any service version, project-id entries in the url
or ``.json`` suffixes will be removed.

View File

@ -6,9 +6,8 @@ python-daemon>=2.0.4,<2.1.0
extras
statsd>=3.0
PrettyTable>=0.6,<0.8
# openstacksdk before 0.21.0 has issues with dogpile.cache
# openstacksdk removes taskmanager in 0.27.0
openstacksdk>=0.21.0,<0.27.0
# openstacksdk before 0.27.0 is TaskManager based
openstacksdk>=0.27.0
diskimage-builder>=2.0.0
voluptuous
kazoo