Replace shade and os-client-config with openstacksdk.

os-client-config is now just a wrapper around openstacksdk. The shade
code has been imported into openstacksdk. To reduce complexity, just use
openstacksdk directly.

openstacksdk's TaskManager has had to grow some features to deal with
SwiftService. Making nodepool's TaskManager a subclass of openstacksdk's
TaskManager ensures that we get the thread pool set up properly.

Change-Id: I3a01eb18ae31cc3b61509984f3817378db832b47
This commit is contained in:
Artem Goncharov 2018-07-13 14:18:59 +02:00 committed by Monty Taylor
parent b8aa756515
commit fc1f80b6d1
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
10 changed files with 56 additions and 61 deletions

View File

@ -230,7 +230,7 @@ def openConfig(path):
def loadConfig(config_path):
config = openConfig(config_path)
# Call driver config reset now to clean global hooks like os_client_config
# Call driver config reset now to clean global hooks like openstacksdk
for driver in Drivers.drivers.values():
driver.reset()

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os_client_config
from openstack.config import loader
from nodepool.driver import Driver
from nodepool.driver.fake.config import FakeProviderConfig
@ -25,7 +25,7 @@ class FakeDriver(Driver):
self.reset()
def reset(self):
self.os_client_config = os_client_config.OpenStackConfig()
self.openstack_config = loader.OpenStackConfig()
def getProviderConfig(self, provider):
return FakeProviderConfig(self, provider)

View File

@ -19,7 +19,7 @@ import threading
import time
import uuid
import shade
import openstack
from nodepool import exceptions
from nodepool.driver.openstack.provider import OpenStackProvider
@ -39,11 +39,11 @@ class Dummy(object):
setattr(self, k, v)
try:
if self.should_fail:
raise shade.OpenStackCloudException('This image has '
'SHOULD_FAIL set to True.')
raise openstack.exceptions.OpenStackCloudException(
'This image has SHOULD_FAIL set to True.')
if self.over_quota:
raise shade.exc.OpenStackCloudHTTPError(
'Quota exceeded for something', 403)
raise openstack.exceptions.HttpException(
message='Quota exceeded for something', http_status=403)
except AttributeError:
pass

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os_client_config
from openstack.config import loader
from nodepool.driver import Driver
from nodepool.driver.openstack.config import OpenStackProviderConfig
@ -25,7 +25,7 @@ class OpenStackDriver(Driver):
self.reset()
def reset(self):
self.os_client_config = os_client_config.OpenStackConfig()
self.openstack_config = loader.OpenStackConfig()
def getProviderConfig(self, provider):
return OpenStackProviderConfig(self, provider)

View File

@ -195,8 +195,8 @@ class OpenStackProviderConfig(ProviderConfig):
def load(self, config):
cloud_kwargs = self._cloudKwargs()
occ = self.driver_object.os_client_config
self.cloud_config = occ.get_one_cloud(**cloud_kwargs)
openstack_config = self.driver_object.openstack_config
self.cloud_config = openstack_config.get_one(**cloud_kwargs)
self.image_type = self.cloud_config.config['image_format']
self.region_name = self.provider.get('region-name')

View File

@ -19,7 +19,7 @@ import logging
import operator
import time
import shade
import openstack
from nodepool import exceptions
from nodepool.driver import Provider
@ -43,7 +43,7 @@ class OpenStackProvider(Provider):
self.provider = provider
self._images = {}
self._networks = {}
self.__flavors = {}
self.__flavors = {} # TODO(gtema): caching
self.__azs = None
self._use_taskmanager = use_taskmanager
self._taskmanager = None
@ -51,7 +51,7 @@ class OpenStackProvider(Provider):
def start(self, zk_conn):
if self._use_taskmanager:
self._taskmanager = TaskManager(None, self.provider.name,
self._taskmanager = TaskManager(self.provider.name,
self.provider.rate)
self._taskmanager.start()
self.resetClient()
@ -67,6 +67,7 @@ class OpenStackProvider(Provider):
def getRequestHandler(self, poolworker, request):
return handler.OpenStackNodeRequestHandler(poolworker, request)
# TODO(gtema): caching
@property
def _flavors(self):
if not self.__flavors:
@ -78,12 +79,12 @@ class OpenStackProvider(Provider):
manager = self._taskmanager
else:
manager = None
return shade.OpenStackCloud(
cloud_config=self.provider.cloud_config,
manager=manager,
return openstack.connection.Connection(
config=self.provider.cloud_config,
task_manager=manager,
app_name='nodepool',
app_version=version.version_info.version_string(),
**self.provider.cloud_config.config)
app_version=version.version_info.version_string()
)
def quotaNeededByNodeType(self, ntype, pool):
provider_label = pool.labels[ntype]
@ -196,19 +197,15 @@ class OpenStackProvider(Provider):
def resetClient(self):
self._client = self._getClient()
if self._use_taskmanager:
self._taskmanager.setClient(self._client)
def _getFlavors(self):
flavors = self.listFlavors()
flavors.sort(key=operator.itemgetter('ram'))
return flavors
# TODO(mordred): These next three methods duplicate logic that is in
# shade, but we can't defer to shade until we're happy
# with using shade's resource caching facility. We have
# not yet proven that to our satisfaction, but if/when
# we do, these should be able to go away.
# TODO(gtema): These next three methods duplicate logic that is in
# openstacksdk, caching is not enabled there by default
# Remove it when caching is default
def _findFlavorByName(self, flavor_name):
for f in self._flavors:
if flavor_name in (f['name'], f['id']):
@ -226,6 +223,16 @@ class OpenStackProvider(Provider):
# Note: this will throw an error if the provider is offline
# but all the callers are in threads (they call in via CreateServer) so
# the mainloop won't be affected.
# TODO(gtema): enable commented block when openstacksdk has caching
# enabled by default
# if min_ram:
# return self._client.get_flavor_by_ram(
# ram=min_ram,
# include=flavor_name,
# get_extra=False)
# else:
# return self._client.get_flavor(flavor_name, get_extra=False)
if min_ram:
return self._findFlavorByRam(min_ram, flavor_name)
else:
@ -314,14 +321,14 @@ class OpenStackProvider(Provider):
try:
return self._client.create_server(wait=False, **create_args)
except shade.OpenStackCloudBadRequest:
except openstack.exceptions.BadRequestException:
# We've gotten a 400 error from nova - which means the request
# was malformed. The most likely cause of that, unless something
# became functionally and systemically broken, is stale image
# or flavor cache. Log a message, invalidate the caches so that
# next time we get new caches.
self._images = {}
self.__flavors = {}
self.__flavors = {} # TODO(gtema): caching
self.log.info(
"Clearing flavor and image caches due to 400 error from nova")
raise
@ -332,7 +339,7 @@ class OpenStackProvider(Provider):
def getServerConsole(self, server_id):
try:
return self._client.get_server_console(server_id)
except shade.OpenStackCloudException:
except openstack.exceptions.OpenStackCloudException:
return None
def waitForServer(self, server, timeout=3600, auto_ip=True):

View File

@ -46,7 +46,7 @@ _DEFAULT_SERVER_LOGGING_CONFIG = {
'handlers': ['console'],
'level': 'WARN',
},
'shade': {
'openstack': {
'handlers': ['console'],
'level': 'WARN',
},

View File

@ -21,6 +21,8 @@ import logging
import queue
import time
from openstack import task_manager as openstack_task_manager
from nodepool import stats
@ -28,24 +30,19 @@ class ManagerStoppedException(Exception):
pass
class TaskManager(object):
class TaskManager(openstack_task_manager.TaskManager):
log = logging.getLogger("nodepool.TaskManager")
def __init__(self, client, name, rate):
super(TaskManager, self).__init__()
def __init__(self, name, rate, workers=5):
super(TaskManager, self).__init__(name=name, workers=workers)
self.daemon = True
self.queue = queue.Queue()
self._running = True
self.name = name
self.rate = float(rate)
self._client = None
self.statsd = stats.get_client()
self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True
def setClient(self, client):
self._client = client
def start(self):
self._thread.start()
@ -70,33 +67,25 @@ class TaskManager(object):
if delta >= self.rate:
break
time.sleep(self.rate - delta)
self.log.debug("Manager %s running task %s (queue: %s)" %
(self.name, type(task).__name__,
self.queue.qsize()))
start = time.time()
self.runTask(task)
last_ts = time.time()
dt = last_ts - start
self.log.debug("Manager %s ran task %s in %ss" %
(self.name, type(task).__name__, dt))
if self.statsd:
# nodepool.task.PROVIDER.subkey
subkey = type(task).__name__
key = 'nodepool.task.%s.%s' % (self.name, subkey)
self.statsd.timing(key, int(dt * 1000))
self.statsd.incr(key)
self.log.debug("Manager %s running task %s (queue %s)" %
(self.name, task.name, self.queue.qsize()))
self.run_task(task)
self.queue.task_done()
except Exception:
self.log.exception("Task manager died.")
raise
def submitTask(self, task):
def post_run_task(self, elapsed_time, task):
super(TaskManager, self).post_run_task(elapsed_time, task)
if self.statsd:
# nodepool.task.PROVIDER.TASK_NAME
key = 'nodepool.task.%s.%s' % (self.name, task.name)
self.statsd.timing(key, int(elapsed_time * 1000))
self.statsd.incr(key)
def submit_task(self, task, raw=False):
if not self._running:
raise ManagerStoppedException(
"Manager %s is no longer running" % self.name)
self.queue.put(task)
return task.wait()
def runTask(self, task):
task.run(self._client)

View File

@ -7,8 +7,7 @@ extras
statsd>=3.0
sqlalchemy>=0.8.2,<1.1.0
PrettyTable>=0.6,<0.8
os-client-config>=1.2.0
shade>=1.21.0
openstacksdk>=0.16.0
diskimage-builder>=2.0.0
voluptuous
kazoo