Implement max-servers for AWS driver
Max-servers has been ignored up until now. This implements the basic checks so that AWS users can limit the number of instances they launch. Change-Id: I73296bde1cdde80c52b6b5b725f268a17562060d
This commit is contained in:
parent
6c2c1d3aac
commit
da21971e9b
|
@ -13,11 +13,12 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import zk
|
||||
from nodepool.driver.utils import NodeLauncher
|
||||
from nodepool.driver.utils import NodeLauncher, QuotaInformation
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
from nodepool.nodeutils import nodescan
|
||||
|
||||
|
@ -25,6 +26,7 @@ from nodepool.nodeutils import nodescan
|
|||
class AwsInstanceLauncher(NodeLauncher):
|
||||
def __init__(self, handler, node, provider_config, provider_label):
|
||||
super().__init__(handler.zk, node, provider_config)
|
||||
self.provider_name = provider_config.name
|
||||
self.retries = provider_config.launch_retries
|
||||
self.pool = provider_config.pools[provider_label.pool.name]
|
||||
self.handler = handler
|
||||
|
@ -60,6 +62,10 @@ class AwsInstanceLauncher(NodeLauncher):
|
|||
if state == 'running':
|
||||
instance.create_tags(Tags=[{'Key': 'nodepool_id',
|
||||
'Value': str(self.node.id)}])
|
||||
instance.create_tags(Tags=[{'Key': 'nodepool_pool',
|
||||
'Value': str(self.pool.name)}])
|
||||
instance.create_tags(Tags=[{'Key': 'nodepool_provider',
|
||||
'Value': str(self.provider_name)}])
|
||||
break
|
||||
time.sleep(0.5)
|
||||
instance.reload()
|
||||
|
@ -127,6 +133,45 @@ class AwsNodeRequestHandler(NodeRequestHandler):
|
|||
return False
|
||||
return True
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
'''
|
||||
Apply max_servers check, ignoring other quotas.
|
||||
|
||||
:returns: True if we have room, False otherwise.
|
||||
'''
|
||||
needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1)
|
||||
n_running = self.manager.countNodes(self.pool.name)
|
||||
pool_quota = QuotaInformation(
|
||||
cores=math.inf,
|
||||
instances=self.pool.max_servers - n_running,
|
||||
ram=math.inf,
|
||||
default=math.inf)
|
||||
pool_quota.subtract(needed_quota)
|
||||
self.log.debug("hasRemainingQuota({},{}) = {}".format(
|
||||
self.pool, ntype, pool_quota))
|
||||
return pool_quota.non_negative()
|
||||
|
||||
def hasProviderQuota(self, node_types):
|
||||
'''
|
||||
Apply max_servers check to a whole request
|
||||
|
||||
:returns: True if we have room, False otherwise.
|
||||
'''
|
||||
needed_quota = QuotaInformation(
|
||||
cores=1,
|
||||
instances=len(node_types),
|
||||
ram=1,
|
||||
default=1)
|
||||
pool_quota = QuotaInformation(
|
||||
cores=math.inf,
|
||||
instances=self.pool.max_servers,
|
||||
ram=math.inf,
|
||||
default=math.inf)
|
||||
pool_quota.subtract(needed_quota)
|
||||
self.log.debug("hasProviderQuota({},{}) = {}".format(
|
||||
self.pool, node_types, pool_quota))
|
||||
return pool_quota.non_negative()
|
||||
|
||||
def launchesComplete(self):
|
||||
'''
|
||||
Check if all launch requests have completed.
|
||||
|
|
|
@ -27,11 +27,14 @@ class AwsInstance:
|
|||
if metadatas:
|
||||
for metadata in metadatas:
|
||||
if metadata["Key"] == "nodepool_id":
|
||||
self.metadata = {
|
||||
'nodepool_provider_name': provider.name,
|
||||
'nodepool_node_id': metadata["Value"],
|
||||
}
|
||||
break
|
||||
self.metadata['nodepool_node_id'] = metadata["Value"]
|
||||
continue
|
||||
if metadata["Key"] == "nodepool_pool":
|
||||
self.metadata['nodepool_pool_name'] = metadata["Value"]
|
||||
continue
|
||||
if metadata["Key"] == "nodepool_provider":
|
||||
self.metadata['nodepool_provider_name'] = metadata["Value"]
|
||||
continue
|
||||
|
||||
def get(self, name, default=None):
|
||||
return getattr(self, name, default)
|
||||
|
@ -65,10 +68,30 @@ class AwsProvider(Provider):
|
|||
for instance in self.ec2.instances.all():
|
||||
if instance.state["Name"].lower() == "terminated":
|
||||
continue
|
||||
ours = False
|
||||
if instance.tags:
|
||||
for tag in instance.tags:
|
||||
if (tag["Key"] == 'nodepool_provider'
|
||||
and tag["Value"] == self.provider.name):
|
||||
ours = True
|
||||
break
|
||||
if not ours:
|
||||
continue
|
||||
servers.append(AwsInstance(
|
||||
instance.id, instance.tags, self.provider))
|
||||
return servers
|
||||
|
||||
def countNodes(self, pool=None):
|
||||
n = 0
|
||||
for instance in self.listNodes():
|
||||
if pool is not None:
|
||||
if 'nodepool_pool_name' not in instance.metadata:
|
||||
continue
|
||||
if pool != instance.metadata['nodepool_pool_name']:
|
||||
continue
|
||||
n += 1
|
||||
return n
|
||||
|
||||
def getImage(self, image_id):
|
||||
return self.ec2.Image(image_id)
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ providers:
|
|||
username: ubuntu
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 5
|
||||
max-servers: 1
|
||||
subnet-id: null
|
||||
security-group-id: null
|
||||
labels:
|
||||
|
|
|
@ -17,6 +17,7 @@ import fixtures
|
|||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import boto3
|
||||
|
@ -78,18 +79,44 @@ class TestDriverAws(tests.DBTestCase):
|
|||
|
||||
self.log.debug("Waiting for request %s", req.id)
|
||||
req = self.waitForNodeRequest(req)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
||||
self.assertNotEqual(req.nodes, [])
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
self.assertEqual(node.allocated_to, req.id)
|
||||
self.assertEqual(node.state, zk.READY)
|
||||
self.assertIsNotNone(node.launcher)
|
||||
self.assertEqual(node.connection_type, 'ssh')
|
||||
nodescan.assert_called_with(
|
||||
node.interface_ip, port=22, timeout=180, gather_hostkeys=True)
|
||||
self.assertEqual(req.state, zk.FULFILLED)
|
||||
|
||||
node.state = zk.DELETING
|
||||
self.zk.storeNode(node)
|
||||
self.assertNotEqual(req.nodes, [])
|
||||
node = self.zk.getNode(req.nodes[0])
|
||||
self.assertEqual(node.allocated_to, req.id)
|
||||
self.assertEqual(node.state, zk.READY)
|
||||
self.assertIsNotNone(node.launcher)
|
||||
self.assertEqual(node.connection_type, 'ssh')
|
||||
nodescan.assert_called_with(
|
||||
node.interface_ip,
|
||||
port=22,
|
||||
timeout=180,
|
||||
gather_hostkeys=True)
|
||||
# A new request will be paused and for lack of quota until this
|
||||
# one is deleted
|
||||
req2 = zk.NodeRequest()
|
||||
req2.state = zk.REQUESTED
|
||||
req2.node_types.append('ubuntu1404')
|
||||
self.zk.storeNodeRequest(req2)
|
||||
req2 = self.waitForNodeRequest(
|
||||
req2, (zk.PENDING, zk.FAILED, zk.FULFILLED))
|
||||
self.assertEqual(req2.state, zk.PENDING)
|
||||
# It could flip from PENDING to one of the others, so sleep a
|
||||
# bit and be sure
|
||||
time.sleep(1)
|
||||
req2 = self.waitForNodeRequest(
|
||||
req2, (zk.PENDING, zk.FAILED, zk.FULFILLED))
|
||||
self.assertEqual(req2.state, zk.PENDING)
|
||||
|
||||
self.waitForNodeDeletion(node)
|
||||
node.state = zk.DELETING
|
||||
self.zk.storeNode(node)
|
||||
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
req2 = self.waitForNodeRequest(req2, (zk.FAILED, zk.FULFILLED))
|
||||
self.assertEqual(req2.state, zk.FULFILLED)
|
||||
node = self.zk.getNode(req2.nodes[0])
|
||||
node.state = zk.DELETING
|
||||
self.zk.storeNode(node)
|
||||
self.waitForNodeDeletion(node)
|
||||
|
|
Loading…
Reference in New Issue