Pass zk connection to ProviderManager.start()

In order to support static node pre-registration, we need to give
the provider manager the opportunity to register/deregister any
nodes in its configuration file when it starts (on startup or when
the config change). It will need a ZooKeeper connection to do this.
The OpenStack driver will ignore this parameter.

Change-Id: Idd00286b2577921b3fe5b55e8f13a27f2fbde5d6
This commit is contained in:
David Shrewsbury 2018-06-12 12:04:16 -04:00
parent 7eeefebbd4
commit a418aabb7a
9 changed files with 28 additions and 10 deletions

View File

@ -536,6 +536,7 @@ class CleanupWorker(BaseWorker):
self._checkForZooKeeperChanges(new_config)
provider_manager.ProviderManager.reconfigure(self._config, new_config,
self._zk,
use_taskmanager=False)
self._config = new_config
@ -878,6 +879,7 @@ class UploadWorker(BaseWorker):
self._checkForZooKeeperChanges(new_config)
provider_manager.ProviderManager.reconfigure(self._config, new_config,
self._zk,
use_taskmanager=False)
self._config = new_config

View File

@ -251,7 +251,7 @@ class NodePoolCmd(NodepoolApp):
return
provider = self.pool.config.providers[node.provider]
manager = provider_manager.get_provider(provider, True)
manager.start()
manager.start(self.zk)
launcher.NodeDeleter.delete(self.zk, manager, node)
manager.stop()
else:

View File

@ -145,11 +145,15 @@ class Provider(object, metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def start(self):
def start(self, zk_conn):
"""Start this provider
:param ZooKeeper zk_conn: A ZooKeeper connection object.
This is called after each configuration change to allow the driver
to perform initialization tasks and start background threads.
to perform initialization tasks and start background threads. The
ZooKeeper connection object is provided if the Provider needs to
interact with it.
"""
pass

View File

@ -118,7 +118,7 @@ class OpenStackProvider(Provider):
self._taskmanager = None
self._current_nodepool_quota = None
def start(self):
def start(self, zk_conn):
if self._use_taskmanager:
self._taskmanager = TaskManager(None, self.provider.name,
self.provider.rate)

View File

@ -57,7 +57,7 @@ class StaticNodeProvider(Provider):
raise StaticNodeError("%s: host key mismatches (%s)" %
(node["name"], keys))
def start(self):
def start(self, zk_conn):
for pool in self.provider.pools.values():
self.pools[pool.name] = {}
for node in pool.nodes:

View File

@ -22,7 +22,7 @@ class TestProvider(Provider):
def __init__(self, provider):
self.provider = provider
def start(self):
def start(self, zk_conn):
pass
def stop(self):

View File

@ -819,8 +819,9 @@ class NodePool(threading.Thread):
def updateConfig(self):
config = self.loadConfig()
provider_manager.ProviderManager.reconfigure(self.config, config)
self.reconfigureZooKeeper(config)
provider_manager.ProviderManager.reconfigure(self.config, config,
self.getZK())
self.setConfig(config)
def removeCompletedRequests(self):

View File

@ -30,7 +30,18 @@ class ProviderManager(object):
log = logging.getLogger("nodepool.ProviderManager")
@staticmethod
def reconfigure(old_config, new_config, use_taskmanager=True):
def reconfigure(old_config, new_config, zk_conn, use_taskmanager=True):
'''
Reconfigure the provider managers on any configuration changes.
If a provider configuration changes, stop the current provider
manager we have cached and replace it with a new one.
:param Config old_config: The previously read configuration.
:param Config new_config: The newly read configuration.
:param ZooKeeper zk_conn: A ZooKeeper connection object.
:param bool use_taskmanager: If True, use a task manager.
'''
stop_managers = []
for p in new_config.providers.values():
oldmanager = None
@ -46,7 +57,7 @@ class ProviderManager(object):
" for %s" % p.name)
new_config.provider_managers[p.name] = \
get_provider(p, use_taskmanager)
new_config.provider_managers[p.name].start()
new_config.provider_managers[p.name].start(zk_conn)
for stop_manager in stop_managers:
stop_manager.stop()

View File

@ -61,7 +61,7 @@ class TestShadeIntegration(tests.IntegrationTestCase):
self.assertIn('real-provider', config.providers)
pm = provider_manager.get_provider(
config.providers['real-provider'], use_taskmanager=False)
pm.start()
pm.start(None)
self.assertEqual(pm._client.auth, auth_data)
def test_nodepool_occ_config_reload(self):