Merge "Pre-register static nodes"

This commit is contained in:
Zuul 2018-06-21 19:16:26 +00:00 committed by Gerrit Code Review
commit e8675f1e7c
9 changed files with 392 additions and 86 deletions

View File

@ -83,9 +83,14 @@ class StaticProviderConfig(ProviderConfig):
'username': node.get('username', 'zuul'),
'max-parallel-jobs': int(node.get('max-parallel-jobs', 1)),
})
for label in node['labels'].split():
pp.labels.add(label)
config.labels[label].pools.append(pp)
if isinstance(node['labels'], str):
for label in node['labels'].split():
pp.labels.add(label)
config.labels[label].pools.append(pp)
elif isinstance(node['labels'], list):
for label in node['labels']:
pp.labels.add(label)
config.labels[label].pools.append(pp)
def getSchema(self):
pool_node = {

View File

@ -13,10 +13,7 @@
# under the License.
import logging
import random
from nodepool import nodeutils
from nodepool import zk
from nodepool.driver import NodeRequestHandler
@ -29,69 +26,21 @@ class StaticNodeRequestHandler(NodeRequestHandler):
# We don't spawn threads to launch nodes, so always return 1.
return 1
def _checkConcurrency(self, static_node):
access_count = 0
unavailable_states = [zk.IN_USE]
if not self.request.reuse:
# When re-use is disabled (e.g. for Min-Ready request), we need
# to consider 'ready' node as in-use.
unavailable_states.append(zk.READY)
for node in self.zk.nodeIterator():
if node.hostname != static_node["name"]:
continue
if node.state in unavailable_states:
access_count += 1
if access_count >= static_node["max-parallel-jobs"]:
self.log.info("%s: max concurrency reached (%d)" % (
static_node["name"], access_count))
return False
return True
def imagesAvailable(self):
'''
This driver doesn't manage images, so always return True.
'''
return True
def launch(self, node):
static_node = None
available_nodes = self.manager.listNodes()
# Randomize static nodes order
random.shuffle(available_nodes)
for available_node in available_nodes:
if node.type[0] in available_node["labels"]:
if self._checkConcurrency(available_node):
static_node = available_node
break
def hasRemainingQuota(self, ntype):
# We are always at quota since we cannot launch new nodes.
return False
if static_node:
self.log.debug("%s: Assigning static_node %s" % (
self.request.id, static_node))
node.state = zk.READY
node.external_id = "static-%s" % self.request.id
node.hostname = static_node["name"]
node.username = static_node["username"]
node.interface_ip = static_node["name"]
node.connection_port = static_node["connection-port"]
node.connection_type = static_node["connection-type"]
nodeutils.set_node_ip(node)
node.host_keys = self.manager.nodes_keys[static_node["name"]]
self.zk.storeNode(node)
def launch(self, node):
# NOTE: We do not expect this to be called since hasRemainingQuota()
# returning False should prevent the call.
raise Exception("Node launching not supported by static driver")
def launchesComplete(self):
'''
Our nodeset could have nodes in BUILDING state because we may be
waiting for one of our static nodes to free up. Keep calling launch()
to try to grab one.
'''
waiting_node = False
for node in self.nodeset:
if node.state == zk.READY:
continue
self.launch(node)
if node.state != zk.READY:
waiting_node = True
return not waiting_node
# We don't wait on a launch since we never actually launch.
return True

View File

@ -14,9 +14,12 @@
import logging
from collections import Counter
from nodepool import exceptions
from nodepool import nodeutils
from nodepool import zk
from nodepool.driver import Provider
from nodepool.nodeutils import nodescan
from nodepool.driver.static.handler import StaticNodeRequestHandler
@ -30,18 +33,16 @@ class StaticNodeProvider(Provider):
def __init__(self, provider, *args):
self.provider = provider
self.pools = {}
self.static_nodes = {}
self.nodes_keys = {}
def checkHost(self, node):
# Check node is reachable
if node["connection-type"] != "ssh":
return
try:
keys = nodescan(node["name"],
port=node["connection-port"],
timeout=node["timeout"])
keys = nodeutils.nodescan(node["name"],
port=node["connection-port"],
timeout=node["timeout"])
except exceptions.ConnectionTimeoutException:
raise StaticNodeError(
"%s:%s: ConnectionTimeoutException" % (
@ -57,18 +58,161 @@ class StaticNodeProvider(Provider):
raise StaticNodeError("%s: host key mismatches (%s)" %
(node["name"], keys))
def start(self, zk_conn):
def getRegisteredReadyNodes(self, hostname):
'''
Get all registered nodes with the given hostname that are READY.
:param str hostname: Hostname of the node (maps to static node name).
:returns: A list of matching Node objects.
'''
nodes = []
for node in self.zk.nodeIterator():
if (node.provider != self.provider.name or
node.state != zk.READY or
node.hostname != hostname
):
continue
nodes.append(node)
return nodes
def getRegisteredNodeHostnames(self):
'''
Get hostnames for all registered static nodes.
:note: We assume hostnames are unique across pools.
:returns: A set of registered hostnames for the static driver.
'''
registered = Counter()
for node in self.zk.nodeIterator():
if node.provider != self.provider.name:
continue
registered.update([node.hostname])
return registered
def registerNodeFromConfig(self, count, provider_name, pool_name,
static_node):
'''
Register a static node from the config with ZooKeeper.
A node can be registered multiple times to support max-parallel-jobs.
These nodes will share a hostname.
:param int count: Number of times to register this node.
:param str provider_name: Name of the provider.
:param str pool_name: Name of the pool owning the node.
:param dict static_node: The node definition from the config file.
'''
host_keys = self.checkHost(static_node)
for i in range(0, count):
node = zk.Node()
node.state = zk.READY
node.provider = provider_name
node.pool = pool_name
node.launcher = "static driver"
node.type = static_node["labels"]
node.hostname = static_node["name"]
node.username = static_node["username"]
node.interface_ip = static_node["name"]
node.connection_port = static_node["connection-port"]
node.connection_type = static_node["connection-type"]
nodeutils.set_node_ip(node)
node.host_keys = host_keys
self.zk.storeNode(node)
self.log.debug("Registered static node %s", node.hostname)
def deregisterNode(self, count, node_name):
'''
Attempt to delete READY nodes.
We can only delete unlocked READY nodes. If we cannot delete those,
let them remain until they naturally are deleted (we won't re-register
them after they are deleted).
:param str node_name: The static node name/hostname.
'''
self.log.debug("Deregistering %s nodes with hostname %s",
count, node_name)
nodes = self.getRegisteredReadyNodes(node_name)
for node in nodes:
if count <= 0:
break
try:
self.zk.lockNode(node, blocking=False)
except exceptions.ZKLockException:
# It's already locked so skip it.
continue
if node.state != zk.READY:
# State changed so skip it.
self.zk.unlockNode(node)
continue
node.state = zk.DELETING
try:
self.zk.storeNode(node)
self.log.debug("Deregistered static node: id=%s, hostname=%s",
node.id, node.hostname)
count = count - 1
except Exception:
self.log.exception("Error deregistering static node:")
finally:
self.zk.unlockNode(node)
def _start(self, zk_conn):
# TODO(Shrews): Deregister nodes when they are removed from the config
# or when max-parallel-jobs is decreased.
self.zk = zk_conn
self.registered = self.getRegisteredNodeHostnames()
for pool in self.provider.pools.values():
self.pools[pool.name] = {}
for node in pool.nodes:
node_name = "%s-%s" % (pool.name, node["name"])
self.log.debug("%s: Registering static node" % node_name)
current_count = self.registered[node["name"]]
# Register nodes to synchronize with our configuration.
if current_count < node["max-parallel-jobs"]:
register_cnt = node["max-parallel-jobs"] - current_count
try:
self.registerNodeFromConfig(
register_cnt, self.provider.name, pool.name, node)
except Exception:
self.log.exception("Couldn't register static node:")
continue
# De-register nodes to synchronize with our configuration.
# This case covers an existing node, but with a decreased
# max-parallel-jobs value.
elif current_count > node["max-parallel-jobs"]:
deregister_cnt = current_count - node["max-parallel-jobs"]
try:
self.deregisterNode(deregister_cnt, node["name"])
except Exception:
self.log.exception("Couldn't deregister static node:")
continue
self.static_nodes[node["name"]] = node
# De-register nodes to synchronize with our configuration.
# This case covers any registered nodes that no longer appear in
# the config.
for hostname in list(self.registered):
if hostname not in self.static_nodes:
try:
self.nodes_keys[node["name"]] = self.checkHost(node)
except StaticNodeError as e:
self.log.error("Couldn't register static node: %s" % e)
self.deregisterNode(self.registered[hostname], hostname)
except Exception:
self.log.exception("Couldn't deregister static node:")
continue
self.static_nodes[node_name] = node
def start(self, zk_conn):
try:
self._start(zk_conn)
except Exception:
self.log.exception("Cannot start static provider:")
def stop(self):
self.log.debug("Stopping")
@ -96,3 +240,27 @@ class StaticNodeProvider(Provider):
def getRequestHandler(self, poolworker, request):
return StaticNodeRequestHandler(poolworker, request)
def nodeDeletedNotification(self, node):
'''
Re-register the deleted node.
'''
# It's possible a deleted node no longer exists in our config, so
# don't bother to reregister.
if node.hostname not in self.static_nodes:
return
static_node = self.static_nodes[node.hostname]
current_count = self.registered[node.hostname]
# It's possible we were not able to de-register nodes due to a config
# change (because they were in use). In that case, don't bother to
# reregister.
if current_count >= static_node["max-parallel-jobs"]:
return
try:
self.registerNodeFromConfig(
1, node.provider, node.pool, static_node)
except Exception:
self.log.exception("Cannot re-register deleted node %s", node)

View File

@ -0,0 +1,26 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: fake-label
providers:
- name: static-provider
driver: static
pools:
- name: main
nodes:
- name: fake-host-1
labels: fake-label
host-key: ssh-rsa FAKEKEY
timeout: 13
connection-port: 22022
username: zuul
- name: fake-host-2
labels: fake-label
host-key: ssh-rsa FAKEKEY
timeout: 13
connection-port: 22022
username: zuul

View File

@ -0,0 +1,20 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: fake-label
providers:
- name: static-provider
driver: static
pools:
- name: main
nodes:
- name: fake-host-1
labels: fake-label
host-key: ssh-rsa FAKEKEY
timeout: 13
connection-port: 22022
username: zuul

View File

@ -0,0 +1,23 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: fake-label
- name: fake-label2
providers:
- name: static-provider
driver: static
pools:
- name: main
nodes:
- name: fake-host-1
labels:
- fake-label
- fake-label2
host-key: ssh-rsa FAKEKEY
timeout: 13
connection-port: 22022
username: zuul

View File

@ -0,0 +1,21 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: fake-label
providers:
- name: static-provider
driver: static
pools:
- name: main
nodes:
- name: fake-host-1
labels: fake-label
host-key: ssh-rsa FAKEKEY
timeout: 13
connection-port: 22022
username: zuul
max-parallel-jobs: 2

View File

@ -5,13 +5,9 @@ zookeeper-servers:
labels:
- name: fake-label
min-ready: 2
- name: fake-label2
- name: fake-concurrent-label
min-ready: 2
- name: fake-windows-label
min-ready: 2
providers:
- name: static-provider

View File

@ -38,15 +38,113 @@ class TestDriverStatic(tests.DBTestCase):
config = nodepool_config.loadConfig(configfile)
self.assertIn('static-provider', config.providers)
def test_static_basic(self):
'''
Test that basic node registration works.
'''
configfile = self.setup_config('static-basic.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for node pre-registration")
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].state, zk.READY)
self.assertEqual(nodes[0].provider, "static-provider")
self.assertEqual(nodes[0].pool, "main")
self.assertEqual(nodes[0].launcher, "static driver")
self.assertEqual(nodes[0].type, ['fake-label'])
self.assertEqual(nodes[0].hostname, 'fake-host-1')
self.assertEqual(nodes[0].interface_ip, 'fake-host-1')
self.assertEqual(nodes[0].username, 'zuul')
self.assertEqual(nodes[0].connection_port, 22022)
self.assertEqual(nodes[0].connection_type, 'ssh')
self.assertEqual(nodes[0].host_keys, ['ssh-rsa FAKEKEY'])
def test_static_node_increase(self):
'''
Test that adding new nodes to the config creates additional nodes.
'''
configfile = self.setup_config('static-basic.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for initial node")
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
self.log.debug("Waiting for additional node")
self.replace_config(configfile, 'static-2-nodes.yaml')
nodes = self.waitForNodes('fake-label', 2)
self.assertEqual(len(nodes), 2)
def test_static_node_decrease(self):
'''
Test that removing nodes from the config removes nodes.
'''
configfile = self.setup_config('static-2-nodes.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for initial nodes")
nodes = self.waitForNodes('fake-label', 2)
self.assertEqual(len(nodes), 2)
self.log.debug("Waiting for node decrease")
self.replace_config(configfile, 'static-basic.yaml')
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].hostname, 'fake-host-1')
def test_static_parallel_increase(self):
'''
Test that increasing max-parallel-jobs creates additional nodes.
'''
configfile = self.setup_config('static-basic.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for initial node")
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
self.log.debug("Waiting for additional node")
self.replace_config(configfile, 'static-parallel-increase.yaml')
nodes = self.waitForNodes('fake-label', 2)
self.assertEqual(len(nodes), 2)
def test_static_parallel_decrease(self):
'''
Test that decreasing max-parallel-jobs deletes nodes.
'''
configfile = self.setup_config('static-parallel-increase.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for initial nodes")
nodes = self.waitForNodes('fake-label', 2)
self.assertEqual(len(nodes), 2)
self.log.debug("Waiting for node decrease")
self.replace_config(configfile, 'static-basic.yaml')
nodes = self.waitForNodes('fake-label')
self.assertEqual(len(nodes), 1)
def test_static_multilabel(self):
configfile = self.setup_config('static-multilabel.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
nodes = self.waitForNodes('fake-label')
self.assertIn('fake-label', nodes[0].type)
self.assertIn('fake-label2', nodes[0].type)
def test_static_handler(self):
configfile = self.setup_config('static.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for min-ready nodes")
node = self.waitForNodes('fake-label')
self.assertEqual(len(node), 1)
nodes = self.waitForNodes('fake-concurrent-label', 2)
self.assertEqual(len(nodes), 2)
self.waitForNodes('fake-concurrent-label', 2)
node = node[0]
self.log.debug("Marking first node as used %s", node.id)