Implement max-servers for AWS driver

Max-servers has been ignored up until now. This implements the basic
checks so that AWS users can limit the number of instances they launch.

Change-Id: I73296bde1cdde80c52b6b5b725f268a17562060d
This commit is contained in:
Clint Byrum 2019-04-02 16:00:50 -07:00
parent 6c2c1d3aac
commit da21971e9b
4 changed files with 114 additions and 19 deletions

View File

@ -13,11 +13,12 @@
# under the License.
import logging
import math
import time
from nodepool import exceptions
from nodepool import zk
from nodepool.driver.utils import NodeLauncher
from nodepool.driver.utils import NodeLauncher, QuotaInformation
from nodepool.driver import NodeRequestHandler
from nodepool.nodeutils import nodescan
@ -25,6 +26,7 @@ from nodepool.nodeutils import nodescan
class AwsInstanceLauncher(NodeLauncher):
def __init__(self, handler, node, provider_config, provider_label):
super().__init__(handler.zk, node, provider_config)
self.provider_name = provider_config.name
self.retries = provider_config.launch_retries
self.pool = provider_config.pools[provider_label.pool.name]
self.handler = handler
@ -60,6 +62,10 @@ class AwsInstanceLauncher(NodeLauncher):
if state == 'running':
instance.create_tags(Tags=[{'Key': 'nodepool_id',
'Value': str(self.node.id)}])
instance.create_tags(Tags=[{'Key': 'nodepool_pool',
'Value': str(self.pool.name)}])
instance.create_tags(Tags=[{'Key': 'nodepool_provider',
'Value': str(self.provider_name)}])
break
time.sleep(0.5)
instance.reload()
@ -127,6 +133,45 @@ class AwsNodeRequestHandler(NodeRequestHandler):
return False
return True
def hasRemainingQuota(self, ntype):
'''
Apply max_servers check, ignoring other quotas.
:returns: True if we have room, False otherwise.
'''
needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1)
n_running = self.manager.countNodes(self.pool.name)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers - n_running,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasRemainingQuota({},{}) = {}".format(
self.pool, ntype, pool_quota))
return pool_quota.non_negative()
def hasProviderQuota(self, node_types):
'''
Apply max_servers check to a whole request
:returns: True if we have room, False otherwise.
'''
needed_quota = QuotaInformation(
cores=1,
instances=len(node_types),
ram=1,
default=1)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasProviderQuota({},{}) = {}".format(
self.pool, node_types, pool_quota))
return pool_quota.non_negative()
def launchesComplete(self):
'''
Check if all launch requests have completed.

View File

@ -27,11 +27,14 @@ class AwsInstance:
if metadatas:
for metadata in metadatas:
if metadata["Key"] == "nodepool_id":
self.metadata = {
'nodepool_provider_name': provider.name,
'nodepool_node_id': metadata["Value"],
}
break
self.metadata['nodepool_node_id'] = metadata["Value"]
continue
if metadata["Key"] == "nodepool_pool":
self.metadata['nodepool_pool_name'] = metadata["Value"]
continue
if metadata["Key"] == "nodepool_provider":
self.metadata['nodepool_provider_name'] = metadata["Value"]
continue
def get(self, name, default=None):
return getattr(self, name, default)
@ -65,10 +68,30 @@ class AwsProvider(Provider):
for instance in self.ec2.instances.all():
if instance.state["Name"].lower() == "terminated":
continue
ours = False
if instance.tags:
for tag in instance.tags:
if (tag["Key"] == 'nodepool_provider'
and tag["Value"] == self.provider.name):
ours = True
break
if not ours:
continue
servers.append(AwsInstance(
instance.id, instance.tags, self.provider))
return servers
def countNodes(self, pool=None):
n = 0
for instance in self.listNodes():
if pool is not None:
if 'nodepool_pool_name' not in instance.metadata:
continue
if pool != instance.metadata['nodepool_pool_name']:
continue
n += 1
return n
def getImage(self, image_id):
return self.ec2.Image(image_id)

View File

@ -16,7 +16,7 @@ providers:
username: ubuntu
pools:
- name: main
max-servers: 5
max-servers: 1
subnet-id: null
security-group-id: null
labels:

View File

@ -17,6 +17,7 @@ import fixtures
import logging
import os
import tempfile
import time
from unittest.mock import patch
import boto3
@ -78,18 +79,44 @@ class TestDriverAws(tests.DBTestCase):
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
nodescan.assert_called_with(
node.interface_ip, port=22, timeout=180, gather_hostkeys=True)
self.assertEqual(req.state, zk.FULFILLED)
node.state = zk.DELETING
self.zk.storeNode(node)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
nodescan.assert_called_with(
node.interface_ip,
port=22,
timeout=180,
gather_hostkeys=True)
# A new request will be paused and for lack of quota until this
# one is deleted
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('ubuntu1404')
self.zk.storeNodeRequest(req2)
req2 = self.waitForNodeRequest(
req2, (zk.PENDING, zk.FAILED, zk.FULFILLED))
self.assertEqual(req2.state, zk.PENDING)
# It could flip from PENDING to one of the others, so sleep a
# bit and be sure
time.sleep(1)
req2 = self.waitForNodeRequest(
req2, (zk.PENDING, zk.FAILED, zk.FULFILLED))
self.assertEqual(req2.state, zk.PENDING)
self.waitForNodeDeletion(node)
node.state = zk.DELETING
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
req2 = self.waitForNodeRequest(req2, (zk.FAILED, zk.FULFILLED))
self.assertEqual(req2.state, zk.FULFILLED)
node = self.zk.getNode(req2.nodes[0])
node.state = zk.DELETING
self.zk.storeNode(node)
self.waitForNodeDeletion(node)