Merge "Implement max-servers for AWS driver"

This commit is contained in:
Zuul 2019-04-15 21:17:33 +00:00 committed by Gerrit Code Review
commit 36d03ab488
4 changed files with 114 additions and 19 deletions

View File

@ -13,11 +13,12 @@
# under the License.
import logging
import math
import time
from nodepool import exceptions
from nodepool import zk
from nodepool.driver.utils import NodeLauncher
from nodepool.driver.utils import NodeLauncher, QuotaInformation
from nodepool.driver import NodeRequestHandler
from nodepool.nodeutils import nodescan
@ -25,6 +26,7 @@ from nodepool.nodeutils import nodescan
class AwsInstanceLauncher(NodeLauncher):
def __init__(self, handler, node, provider_config, provider_label):
super().__init__(handler.zk, node, provider_config)
self.provider_name = provider_config.name
self.retries = provider_config.launch_retries
self.pool = provider_config.pools[provider_label.pool.name]
self.handler = handler
@ -60,6 +62,10 @@ class AwsInstanceLauncher(NodeLauncher):
if state == 'running':
instance.create_tags(Tags=[{'Key': 'nodepool_id',
'Value': str(self.node.id)}])
instance.create_tags(Tags=[{'Key': 'nodepool_pool',
'Value': str(self.pool.name)}])
instance.create_tags(Tags=[{'Key': 'nodepool_provider',
'Value': str(self.provider_name)}])
break
time.sleep(0.5)
instance.reload()
@ -127,6 +133,45 @@ class AwsNodeRequestHandler(NodeRequestHandler):
return False
return True
def hasRemainingQuota(self, ntype):
'''
Apply max_servers check, ignoring other quotas.
:returns: True if we have room, False otherwise.
'''
needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1)
n_running = self.manager.countNodes(self.pool.name)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers - n_running,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasRemainingQuota({},{}) = {}".format(
self.pool, ntype, pool_quota))
return pool_quota.non_negative()
def hasProviderQuota(self, node_types):
'''
Apply max_servers check to a whole request
:returns: True if we have room, False otherwise.
'''
needed_quota = QuotaInformation(
cores=1,
instances=len(node_types),
ram=1,
default=1)
pool_quota = QuotaInformation(
cores=math.inf,
instances=self.pool.max_servers,
ram=math.inf,
default=math.inf)
pool_quota.subtract(needed_quota)
self.log.debug("hasProviderQuota({},{}) = {}".format(
self.pool, node_types, pool_quota))
return pool_quota.non_negative()
def launchesComplete(self):
'''
Check if all launch requests have completed.

View File

@ -27,11 +27,14 @@ class AwsInstance:
if metadatas:
for metadata in metadatas:
if metadata["Key"] == "nodepool_id":
self.metadata = {
'nodepool_provider_name': provider.name,
'nodepool_node_id': metadata["Value"],
}
break
self.metadata['nodepool_node_id'] = metadata["Value"]
continue
if metadata["Key"] == "nodepool_pool":
self.metadata['nodepool_pool_name'] = metadata["Value"]
continue
if metadata["Key"] == "nodepool_provider":
self.metadata['nodepool_provider_name'] = metadata["Value"]
continue
def get(self, name, default=None):
return getattr(self, name, default)
@ -65,10 +68,30 @@ class AwsProvider(Provider):
for instance in self.ec2.instances.all():
if instance.state["Name"].lower() == "terminated":
continue
ours = False
if instance.tags:
for tag in instance.tags:
if (tag["Key"] == 'nodepool_provider'
and tag["Value"] == self.provider.name):
ours = True
break
if not ours:
continue
servers.append(AwsInstance(
instance.id, instance.tags, self.provider))
return servers
def countNodes(self, pool=None):
n = 0
for instance in self.listNodes():
if pool is not None:
if 'nodepool_pool_name' not in instance.metadata:
continue
if pool != instance.metadata['nodepool_pool_name']:
continue
n += 1
return n
def getImage(self, image_id):
return self.ec2.Image(image_id)

View File

@ -16,7 +16,7 @@ providers:
username: ubuntu
pools:
- name: main
max-servers: 5
max-servers: 1
subnet-id: null
security-group-id: null
labels:

View File

@ -17,6 +17,7 @@ import fixtures
import logging
import os
import tempfile
import time
from unittest.mock import patch
import boto3
@ -78,18 +79,44 @@ class TestDriverAws(tests.DBTestCase):
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
nodescan.assert_called_with(
node.interface_ip, port=22, timeout=180, gather_hostkeys=True)
self.assertEqual(req.state, zk.FULFILLED)
node.state = zk.DELETING
self.zk.storeNode(node)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
nodescan.assert_called_with(
node.interface_ip,
port=22,
timeout=180,
gather_hostkeys=True)
# A new request will be paused and for lack of quota until this
# one is deleted
req2 = zk.NodeRequest()
req2.state = zk.REQUESTED
req2.node_types.append('ubuntu1404')
self.zk.storeNodeRequest(req2)
req2 = self.waitForNodeRequest(
req2, (zk.PENDING, zk.FAILED, zk.FULFILLED))
self.assertEqual(req2.state, zk.PENDING)
# It could flip from PENDING to one of the others, so sleep a
# bit and be sure
time.sleep(1)
req2 = self.waitForNodeRequest(
req2, (zk.PENDING, zk.FAILED, zk.FULFILLED))
self.assertEqual(req2.state, zk.PENDING)
self.waitForNodeDeletion(node)
node.state = zk.DELETING
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
req2 = self.waitForNodeRequest(req2, (zk.FAILED, zk.FULFILLED))
self.assertEqual(req2.state, zk.FULFILLED)
node = self.zk.getNode(req2.nodes[0])
node.state = zk.DELETING
self.zk.storeNode(node)
self.waitForNodeDeletion(node)