Merge "Pre-register static nodes"
This commit is contained in:
commit
e8675f1e7c
|
@ -83,9 +83,14 @@ class StaticProviderConfig(ProviderConfig):
|
|||
'username': node.get('username', 'zuul'),
|
||||
'max-parallel-jobs': int(node.get('max-parallel-jobs', 1)),
|
||||
})
|
||||
for label in node['labels'].split():
|
||||
pp.labels.add(label)
|
||||
config.labels[label].pools.append(pp)
|
||||
if isinstance(node['labels'], str):
|
||||
for label in node['labels'].split():
|
||||
pp.labels.add(label)
|
||||
config.labels[label].pools.append(pp)
|
||||
elif isinstance(node['labels'], list):
|
||||
for label in node['labels']:
|
||||
pp.labels.add(label)
|
||||
config.labels[label].pools.append(pp)
|
||||
|
||||
def getSchema(self):
|
||||
pool_node = {
|
||||
|
|
|
@ -13,10 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
import random
|
||||
|
||||
from nodepool import nodeutils
|
||||
from nodepool import zk
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
|
||||
|
||||
|
@ -29,69 +26,21 @@ class StaticNodeRequestHandler(NodeRequestHandler):
|
|||
# We don't spawn threads to launch nodes, so always return 1.
|
||||
return 1
|
||||
|
||||
def _checkConcurrency(self, static_node):
|
||||
access_count = 0
|
||||
|
||||
unavailable_states = [zk.IN_USE]
|
||||
if not self.request.reuse:
|
||||
# When re-use is disabled (e.g. for Min-Ready request), we need
|
||||
# to consider 'ready' node as in-use.
|
||||
unavailable_states.append(zk.READY)
|
||||
|
||||
for node in self.zk.nodeIterator():
|
||||
if node.hostname != static_node["name"]:
|
||||
continue
|
||||
if node.state in unavailable_states:
|
||||
access_count += 1
|
||||
|
||||
if access_count >= static_node["max-parallel-jobs"]:
|
||||
self.log.info("%s: max concurrency reached (%d)" % (
|
||||
static_node["name"], access_count))
|
||||
return False
|
||||
return True
|
||||
|
||||
def imagesAvailable(self):
|
||||
'''
|
||||
This driver doesn't manage images, so always return True.
|
||||
'''
|
||||
return True
|
||||
|
||||
def launch(self, node):
|
||||
static_node = None
|
||||
available_nodes = self.manager.listNodes()
|
||||
# Randomize static nodes order
|
||||
random.shuffle(available_nodes)
|
||||
for available_node in available_nodes:
|
||||
if node.type[0] in available_node["labels"]:
|
||||
if self._checkConcurrency(available_node):
|
||||
static_node = available_node
|
||||
break
|
||||
def hasRemainingQuota(self, ntype):
|
||||
# We are always at quota since we cannot launch new nodes.
|
||||
return False
|
||||
|
||||
if static_node:
|
||||
self.log.debug("%s: Assigning static_node %s" % (
|
||||
self.request.id, static_node))
|
||||
node.state = zk.READY
|
||||
node.external_id = "static-%s" % self.request.id
|
||||
node.hostname = static_node["name"]
|
||||
node.username = static_node["username"]
|
||||
node.interface_ip = static_node["name"]
|
||||
node.connection_port = static_node["connection-port"]
|
||||
node.connection_type = static_node["connection-type"]
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = self.manager.nodes_keys[static_node["name"]]
|
||||
self.zk.storeNode(node)
|
||||
def launch(self, node):
|
||||
# NOTE: We do not expect this to be called since hasRemainingQuota()
|
||||
# returning False should prevent the call.
|
||||
raise Exception("Node launching not supported by static driver")
|
||||
|
||||
def launchesComplete(self):
|
||||
'''
|
||||
Our nodeset could have nodes in BUILDING state because we may be
|
||||
waiting for one of our static nodes to free up. Keep calling launch()
|
||||
to try to grab one.
|
||||
'''
|
||||
waiting_node = False
|
||||
for node in self.nodeset:
|
||||
if node.state == zk.READY:
|
||||
continue
|
||||
self.launch(node)
|
||||
if node.state != zk.READY:
|
||||
waiting_node = True
|
||||
return not waiting_node
|
||||
# We don't wait on a launch since we never actually launch.
|
||||
return True
|
||||
|
|
|
@ -14,9 +14,12 @@
|
|||
|
||||
import logging
|
||||
|
||||
from collections import Counter
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils
|
||||
from nodepool import zk
|
||||
from nodepool.driver import Provider
|
||||
from nodepool.nodeutils import nodescan
|
||||
from nodepool.driver.static.handler import StaticNodeRequestHandler
|
||||
|
||||
|
||||
|
@ -30,18 +33,16 @@ class StaticNodeProvider(Provider):
|
|||
|
||||
def __init__(self, provider, *args):
|
||||
self.provider = provider
|
||||
self.pools = {}
|
||||
self.static_nodes = {}
|
||||
self.nodes_keys = {}
|
||||
|
||||
def checkHost(self, node):
|
||||
# Check node is reachable
|
||||
if node["connection-type"] != "ssh":
|
||||
return
|
||||
try:
|
||||
keys = nodescan(node["name"],
|
||||
port=node["connection-port"],
|
||||
timeout=node["timeout"])
|
||||
keys = nodeutils.nodescan(node["name"],
|
||||
port=node["connection-port"],
|
||||
timeout=node["timeout"])
|
||||
except exceptions.ConnectionTimeoutException:
|
||||
raise StaticNodeError(
|
||||
"%s:%s: ConnectionTimeoutException" % (
|
||||
|
@ -57,18 +58,161 @@ class StaticNodeProvider(Provider):
|
|||
raise StaticNodeError("%s: host key mismatches (%s)" %
|
||||
(node["name"], keys))
|
||||
|
||||
def start(self, zk_conn):
|
||||
def getRegisteredReadyNodes(self, hostname):
|
||||
'''
|
||||
Get all registered nodes with the given hostname that are READY.
|
||||
|
||||
:param str hostname: Hostname of the node (maps to static node name).
|
||||
:returns: A list of matching Node objects.
|
||||
'''
|
||||
nodes = []
|
||||
for node in self.zk.nodeIterator():
|
||||
if (node.provider != self.provider.name or
|
||||
node.state != zk.READY or
|
||||
node.hostname != hostname
|
||||
):
|
||||
continue
|
||||
nodes.append(node)
|
||||
return nodes
|
||||
|
||||
def getRegisteredNodeHostnames(self):
|
||||
'''
|
||||
Get hostnames for all registered static nodes.
|
||||
|
||||
:note: We assume hostnames are unique across pools.
|
||||
|
||||
:returns: A set of registered hostnames for the static driver.
|
||||
'''
|
||||
registered = Counter()
|
||||
for node in self.zk.nodeIterator():
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
registered.update([node.hostname])
|
||||
return registered
|
||||
|
||||
def registerNodeFromConfig(self, count, provider_name, pool_name,
|
||||
static_node):
|
||||
'''
|
||||
Register a static node from the config with ZooKeeper.
|
||||
|
||||
A node can be registered multiple times to support max-parallel-jobs.
|
||||
These nodes will share a hostname.
|
||||
|
||||
:param int count: Number of times to register this node.
|
||||
:param str provider_name: Name of the provider.
|
||||
:param str pool_name: Name of the pool owning the node.
|
||||
:param dict static_node: The node definition from the config file.
|
||||
'''
|
||||
host_keys = self.checkHost(static_node)
|
||||
|
||||
for i in range(0, count):
|
||||
node = zk.Node()
|
||||
node.state = zk.READY
|
||||
node.provider = provider_name
|
||||
node.pool = pool_name
|
||||
node.launcher = "static driver"
|
||||
node.type = static_node["labels"]
|
||||
node.hostname = static_node["name"]
|
||||
node.username = static_node["username"]
|
||||
node.interface_ip = static_node["name"]
|
||||
node.connection_port = static_node["connection-port"]
|
||||
node.connection_type = static_node["connection-type"]
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = host_keys
|
||||
self.zk.storeNode(node)
|
||||
self.log.debug("Registered static node %s", node.hostname)
|
||||
|
||||
def deregisterNode(self, count, node_name):
|
||||
'''
|
||||
Attempt to delete READY nodes.
|
||||
|
||||
We can only delete unlocked READY nodes. If we cannot delete those,
|
||||
let them remain until they naturally are deleted (we won't re-register
|
||||
them after they are deleted).
|
||||
|
||||
:param str node_name: The static node name/hostname.
|
||||
'''
|
||||
self.log.debug("Deregistering %s nodes with hostname %s",
|
||||
count, node_name)
|
||||
|
||||
nodes = self.getRegisteredReadyNodes(node_name)
|
||||
|
||||
for node in nodes:
|
||||
if count <= 0:
|
||||
break
|
||||
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
# It's already locked so skip it.
|
||||
continue
|
||||
|
||||
if node.state != zk.READY:
|
||||
# State changed so skip it.
|
||||
self.zk.unlockNode(node)
|
||||
continue
|
||||
|
||||
node.state = zk.DELETING
|
||||
try:
|
||||
self.zk.storeNode(node)
|
||||
self.log.debug("Deregistered static node: id=%s, hostname=%s",
|
||||
node.id, node.hostname)
|
||||
count = count - 1
|
||||
except Exception:
|
||||
self.log.exception("Error deregistering static node:")
|
||||
finally:
|
||||
self.zk.unlockNode(node)
|
||||
|
||||
def _start(self, zk_conn):
|
||||
# TODO(Shrews): Deregister nodes when they are removed from the config
|
||||
# or when max-parallel-jobs is decreased.
|
||||
|
||||
self.zk = zk_conn
|
||||
self.registered = self.getRegisteredNodeHostnames()
|
||||
|
||||
for pool in self.provider.pools.values():
|
||||
self.pools[pool.name] = {}
|
||||
for node in pool.nodes:
|
||||
node_name = "%s-%s" % (pool.name, node["name"])
|
||||
self.log.debug("%s: Registering static node" % node_name)
|
||||
current_count = self.registered[node["name"]]
|
||||
|
||||
# Register nodes to synchronize with our configuration.
|
||||
if current_count < node["max-parallel-jobs"]:
|
||||
register_cnt = node["max-parallel-jobs"] - current_count
|
||||
try:
|
||||
self.registerNodeFromConfig(
|
||||
register_cnt, self.provider.name, pool.name, node)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't register static node:")
|
||||
continue
|
||||
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers an existing node, but with a decreased
|
||||
# max-parallel-jobs value.
|
||||
elif current_count > node["max-parallel-jobs"]:
|
||||
deregister_cnt = current_count - node["max-parallel-jobs"]
|
||||
try:
|
||||
self.deregisterNode(deregister_cnt, node["name"])
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
continue
|
||||
|
||||
self.static_nodes[node["name"]] = node
|
||||
|
||||
# De-register nodes to synchronize with our configuration.
|
||||
# This case covers any registered nodes that no longer appear in
|
||||
# the config.
|
||||
for hostname in list(self.registered):
|
||||
if hostname not in self.static_nodes:
|
||||
try:
|
||||
self.nodes_keys[node["name"]] = self.checkHost(node)
|
||||
except StaticNodeError as e:
|
||||
self.log.error("Couldn't register static node: %s" % e)
|
||||
self.deregisterNode(self.registered[hostname], hostname)
|
||||
except Exception:
|
||||
self.log.exception("Couldn't deregister static node:")
|
||||
continue
|
||||
self.static_nodes[node_name] = node
|
||||
|
||||
def start(self, zk_conn):
|
||||
try:
|
||||
self._start(zk_conn)
|
||||
except Exception:
|
||||
self.log.exception("Cannot start static provider:")
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
|
@ -96,3 +240,27 @@ class StaticNodeProvider(Provider):
|
|||
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
return StaticNodeRequestHandler(poolworker, request)
|
||||
|
||||
def nodeDeletedNotification(self, node):
|
||||
'''
|
||||
Re-register the deleted node.
|
||||
'''
|
||||
# It's possible a deleted node no longer exists in our config, so
|
||||
# don't bother to reregister.
|
||||
if node.hostname not in self.static_nodes:
|
||||
return
|
||||
|
||||
static_node = self.static_nodes[node.hostname]
|
||||
current_count = self.registered[node.hostname]
|
||||
|
||||
# It's possible we were not able to de-register nodes due to a config
|
||||
# change (because they were in use). In that case, don't bother to
|
||||
# reregister.
|
||||
if current_count >= static_node["max-parallel-jobs"]:
|
||||
return
|
||||
|
||||
try:
|
||||
self.registerNodeFromConfig(
|
||||
1, node.provider, node.pool, static_node)
|
||||
except Exception:
|
||||
self.log.exception("Cannot re-register deleted node %s", node)
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
driver: static
|
||||
pools:
|
||||
- name: main
|
||||
nodes:
|
||||
- name: fake-host-1
|
||||
labels: fake-label
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
||||
- name: fake-host-2
|
||||
labels: fake-label
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
|
@ -0,0 +1,20 @@
|
|||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
driver: static
|
||||
pools:
|
||||
- name: main
|
||||
nodes:
|
||||
- name: fake-host-1
|
||||
labels: fake-label
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
|
@ -0,0 +1,23 @@
|
|||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label
|
||||
- name: fake-label2
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
driver: static
|
||||
pools:
|
||||
- name: main
|
||||
nodes:
|
||||
- name: fake-host-1
|
||||
labels:
|
||||
- fake-label
|
||||
- fake-label2
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
|
@ -0,0 +1,21 @@
|
|||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
port: {zookeeper_port}
|
||||
chroot: {zookeeper_chroot}
|
||||
|
||||
labels:
|
||||
- name: fake-label
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
driver: static
|
||||
pools:
|
||||
- name: main
|
||||
nodes:
|
||||
- name: fake-host-1
|
||||
labels: fake-label
|
||||
host-key: ssh-rsa FAKEKEY
|
||||
timeout: 13
|
||||
connection-port: 22022
|
||||
username: zuul
|
||||
max-parallel-jobs: 2
|
|
@ -5,13 +5,9 @@ zookeeper-servers:
|
|||
|
||||
labels:
|
||||
- name: fake-label
|
||||
min-ready: 2
|
||||
|
||||
- name: fake-label2
|
||||
- name: fake-concurrent-label
|
||||
min-ready: 2
|
||||
|
||||
- name: fake-windows-label
|
||||
min-ready: 2
|
||||
|
||||
providers:
|
||||
- name: static-provider
|
||||
|
|
|
@ -38,15 +38,113 @@ class TestDriverStatic(tests.DBTestCase):
|
|||
config = nodepool_config.loadConfig(configfile)
|
||||
self.assertIn('static-provider', config.providers)
|
||||
|
||||
def test_static_basic(self):
|
||||
'''
|
||||
Test that basic node registration works.
|
||||
'''
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.log.debug("Waiting for node pre-registration")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
|
||||
self.assertEqual(nodes[0].state, zk.READY)
|
||||
self.assertEqual(nodes[0].provider, "static-provider")
|
||||
self.assertEqual(nodes[0].pool, "main")
|
||||
self.assertEqual(nodes[0].launcher, "static driver")
|
||||
self.assertEqual(nodes[0].type, ['fake-label'])
|
||||
self.assertEqual(nodes[0].hostname, 'fake-host-1')
|
||||
self.assertEqual(nodes[0].interface_ip, 'fake-host-1')
|
||||
self.assertEqual(nodes[0].username, 'zuul')
|
||||
self.assertEqual(nodes[0].connection_port, 22022)
|
||||
self.assertEqual(nodes[0].connection_type, 'ssh')
|
||||
self.assertEqual(nodes[0].host_keys, ['ssh-rsa FAKEKEY'])
|
||||
|
||||
def test_static_node_increase(self):
|
||||
'''
|
||||
Test that adding new nodes to the config creates additional nodes.
|
||||
'''
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.log.debug("Waiting for initial node")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
|
||||
self.log.debug("Waiting for additional node")
|
||||
self.replace_config(configfile, 'static-2-nodes.yaml')
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
|
||||
def test_static_node_decrease(self):
|
||||
'''
|
||||
Test that removing nodes from the config removes nodes.
|
||||
'''
|
||||
configfile = self.setup_config('static-2-nodes.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.log.debug("Waiting for initial nodes")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
|
||||
self.log.debug("Waiting for node decrease")
|
||||
self.replace_config(configfile, 'static-basic.yaml')
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
self.assertEqual(nodes[0].hostname, 'fake-host-1')
|
||||
|
||||
def test_static_parallel_increase(self):
|
||||
'''
|
||||
Test that increasing max-parallel-jobs creates additional nodes.
|
||||
'''
|
||||
configfile = self.setup_config('static-basic.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.log.debug("Waiting for initial node")
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
|
||||
self.log.debug("Waiting for additional node")
|
||||
self.replace_config(configfile, 'static-parallel-increase.yaml')
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
|
||||
def test_static_parallel_decrease(self):
|
||||
'''
|
||||
Test that decreasing max-parallel-jobs deletes nodes.
|
||||
'''
|
||||
configfile = self.setup_config('static-parallel-increase.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
|
||||
self.log.debug("Waiting for initial nodes")
|
||||
nodes = self.waitForNodes('fake-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
|
||||
self.log.debug("Waiting for node decrease")
|
||||
self.replace_config(configfile, 'static-basic.yaml')
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(nodes), 1)
|
||||
|
||||
def test_static_multilabel(self):
|
||||
configfile = self.setup_config('static-multilabel.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
nodes = self.waitForNodes('fake-label')
|
||||
self.assertIn('fake-label', nodes[0].type)
|
||||
self.assertIn('fake-label2', nodes[0].type)
|
||||
|
||||
def test_static_handler(self):
|
||||
configfile = self.setup_config('static.yaml')
|
||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||
pool.start()
|
||||
self.log.debug("Waiting for min-ready nodes")
|
||||
node = self.waitForNodes('fake-label')
|
||||
self.assertEqual(len(node), 1)
|
||||
nodes = self.waitForNodes('fake-concurrent-label', 2)
|
||||
self.assertEqual(len(nodes), 2)
|
||||
self.waitForNodes('fake-concurrent-label', 2)
|
||||
|
||||
node = node[0]
|
||||
self.log.debug("Marking first node as used %s", node.id)
|
||||
|
|
Loading…
Reference in New Issue