Add a nodepool command

Moves the daemon command to nodepoold.

Refactor config handling a bit in NodePool to make the config
objects just contain information by default (though things
such as database handles and managers may get added to them
later as needed).

Start with the list and image-list commands.

Change-Id: If2ba7bca7ab4ef922787176af87ad5de31ae4b3e
This commit is contained in:
James E. Blair 2013-08-30 14:31:07 -07:00
parent b1b8a569ef
commit 7a1fe1891f
4 changed files with 261 additions and 125 deletions

103
nodepool/cmd/nodepoolcmd.py Normal file
View File

@ -0,0 +1,103 @@
#!/usr/bin/env python
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import logging.config
import sys
import time
from nodepool import nodedb
from nodepool import nodepool
from prettytable import PrettyTable
class NodePoolCmd(object):
def __init__(self):
self.args = None
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Node pool.')
parser.add_argument('-c', dest='config',
help='path to config file')
parser.add_argument('--version', dest='version', action='store_true',
help='show version')
subparsers = parser.add_subparsers(title='commands',
description='valid commands',
help='additional help')
cmd_list = subparsers.add_parser('list', help='list nodes')
cmd_list.set_defaults(func=self.list)
cmd_image_list = subparsers.add_parser('image-list',
help='list images')
cmd_image_list.set_defaults(func=self.image_list)
self.args = parser.parse_args()
def setup_logging(self):
logging.basicConfig(level=logging.INFO)
def list(self):
t = PrettyTable(["Provider", "Image", "Target", "Hostname", "NodeName",
"Server ID", "IP", "State", "Age (hours)"])
t.align = 'l'
now = time.time()
with self.pool.getDB().getSession() as session:
for node in session.getNodes():
t.add_row([node.provider_name, node.image_name,
node.target_name, node.hostname, node.nodename,
node.external_id, node.ip,
nodedb.STATE_NAMES[node.state],
'%.02f' % ((now - node.state_time) / 3600)])
print t
def image_list(self):
t = PrettyTable(["Provider", "Image", "Hostname", "Version",
"Image ID", "Server ID", "State", "Age (hours)"])
t.align = 'l'
now = time.time()
with self.pool.getDB().getSession() as session:
for image in session.getSnapshotImages():
t.add_row([image.provider_name, image.image_name,
image.hostname, image.version,
image.external_id, image.server_external_id,
nodedb.STATE_NAMES[image.state],
'%.02f' % ((now - image.state_time) / 3600)])
print t
def main(self):
self.parse_arguments()
if self.args.version:
from nodepool.version import version_info as npc_version_info
print "Nodepool version: %s" % npc_version_info.version_string()
return(0)
self.pool = nodepool.NodePool(self.args.config)
config = self.pool.loadConfig()
self.pool.reconfigureDatabase(config)
self.pool.setConfig(config)
self.args.func()
def main():
npc = NodePoolCmd()
npc.setup_logging()
return npc.main()
if __name__ == "__main__":
sys.exit(main())

View File

@ -58,7 +58,7 @@ class NodeCompleteThread(threading.Thread):
def run(self):
try:
with self.nodepool.db.getSession() as session:
with self.nodepool.getDB().getSession() as session:
self.handleEvent(session)
except Exception:
self.log.exception("Exception handling event for %s:" %
@ -134,7 +134,7 @@ class NodeUpdateListener(threading.Thread):
topic)
def handleStartPhase(self, nodename, jobname):
with self.nodepool.db.getSession() as session:
with self.nodepool.getDB().getSession() as session:
node = session.getNodeByNodename(nodename)
if not node:
self.log.debug("Unable to find node with nodename: %s" %
@ -173,7 +173,7 @@ class NodeLauncher(threading.Thread):
self.log.exception("Exception in run method:")
def _run(self):
with self.nodepool.db.getSession() as session:
with self.nodepool.getDB().getSession() as session:
self.log.debug("Launching node id: %s" % self.node_id)
try:
self.node = session.getNode(self.node_id)
@ -304,7 +304,7 @@ class ImageUpdater(threading.Thread):
self.log.exception("Exception in run method:")
def _run(self):
with self.nodepool.db.getSession() as session:
with self.nodepool.getDB().getSession() as session:
self.log.debug("Updating image %s in %s " % (self.image.name,
self.provider.name))
try:
@ -465,83 +465,64 @@ class TargetImageProvider(ConfigValue):
pass
class Cron(ConfigValue):
pass
class ZMQPublisher(ConfigValue):
pass
class NodePool(threading.Thread):
log = logging.getLogger("nodepool.NodePool")
def __init__(self, configfile):
threading.Thread.__init__(self)
self.configfile = configfile
self.zmq_context = None
self.zmq_listeners = {}
self.db = None
self.dburi = None
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
self.update_cron = ''
self.update_job = None
self.cleanup_cron = ''
self.cleanup_job = None
self.check_cron = ''
self.check_job = None
self._stopped = False
self.config = None
self.loadConfig()
self.zmq_context = None
self.apsched = None
def stop(self):
self._stopped = True
self.zmq_context.destroy()
self.apsched.shutdown()
if self.zmq_context:
self.zmq_context.destroy()
if self.apsched:
self.apsched.shutdown()
def loadConfig(self):
self.log.debug("Loading configuration")
config = yaml.load(open(self.configfile))
update_cron = config.get('cron', {}).get('image-update', '14 2 * * *')
cleanup_cron = config.get('cron', {}).get('cleanup', '27 */6 * * *')
check_cron = config.get('cron', {}).get('check', '*/15 * * * *')
if (update_cron != self.update_cron):
if self.update_job:
self.apsched.unschedule_job(self.update_job)
parts = update_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self._doUpdateImages,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.update_cron = update_cron
if (cleanup_cron != self.cleanup_cron):
if self.cleanup_job:
self.apsched.unschedule_job(self.cleanup_job)
parts = cleanup_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self._doPeriodicCleanup,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.cleanup_cron = cleanup_cron
if (check_cron != self.check_cron):
if self.check_job:
self.apsched.unschedule_job(self.check_job)
parts = check_cron.split()
minute, hour, dom, month, dow = parts[:5]
self.apsched.add_cron_job(self._doPeriodicCheck,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
self.check_cron = check_cron
newconfig = Config()
newconfig.db = None
newconfig.dburi = None
newconfig.providers = {}
newconfig.targets = {}
newconfig.scriptdir = config.get('script-dir')
newconfig.dburi = config.get('dburi')
newconfig.provider_managers = {}
newconfig.jenkins_managers = {}
stop_managers = []
newconfig.zmq_publishers = {}
newconfig.crons = {}
for name, default in [
('image-update', '14 2 * * *'),
('cleanup', '27 */6 * * *'),
('check', '*/15 * * * *'),
]:
c = Cron()
c.name = name
newconfig.crons[c.name] = c
c.job = None
c.timespec = config.get('cron', {}).get(name, default)
for addr in config['zmq-publishers']:
z = ZMQPublisher()
z.name = addr
z.listener = None
newconfig.zmq_publishers[z.name] = z
for provider in config['providers']:
p = Provider()
@ -556,27 +537,6 @@ class NodePool(threading.Thread):
p.region_name = provider.get('region-name')
p.max_servers = provider['max-servers']
p.rate = provider.get('rate', 1.0)
oldmanager = None
if self.config:
oldmanager = self.config.provider_managers.get(p.name)
if oldmanager:
if (p.username != oldmanager.provider.username or
p.password != oldmanager.provider.password or
p.project_id != oldmanager.provider.project_id or
p.auth_url != oldmanager.provider.auth_url or
p.service_type != oldmanager.provider.service_type or
p.service_name != oldmanager.provider.service_name or
p.region_name != oldmanager.provider.region_name):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
newconfig.provider_managers[p.name] = oldmanager
else:
self.log.debug("Creating new ProviderManager object for %s" %
p.name)
newconfig.provider_managers[p.name] = \
provider_manager.ProviderManager(p)
newconfig.provider_managers[p.name].start()
p.images = {}
for image in provider['images']:
i = ProviderImage()
@ -607,23 +567,6 @@ class NodePool(threading.Thread):
t.jenkins_credentials_id = None
t.jenkins_test_job = None
t.rate = target.get('rate', 1.0)
oldmanager = None
if self.config:
oldmanager = self.config.jenkins_managers.get(t.name)
if oldmanager:
if (t.jenkins_url != oldmanager.target.jenkins_url or
t.jenkins_user != oldmanager.target.jenkins_user or
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
newconfig.jenkins_managers[t.name] = oldmanager
else:
self.log.debug("Creating new JenkinsManager object for %s" %
t.name)
newconfig.jenkins_managers[t.name] = \
jenkins_manager.JenkinsManager(t)
newconfig.jenkins_managers[t.name].start()
t.images = {}
for image in target['images']:
i = TargetImage()
@ -635,13 +578,113 @@ class NodePool(threading.Thread):
p.name = provider['name']
i.providers[p.name] = p
p.min_ready = provider['min-ready']
self.config = newconfig
return newconfig
def reconfigureDatabase(self, config):
if (not self.config) or config.dburi != self.config.dburi:
config.db = nodedb.NodeDatabase(config.dburi)
else:
config.db = self.config.db
def reconfigureManagers(self, config):
stop_managers = []
for p in config.providers.values():
oldmanager = None
if self.config:
oldmanager = self.config.provider_managers.get(p.name)
if oldmanager:
if (p.username != oldmanager.provider.username or
p.password != oldmanager.provider.password or
p.project_id != oldmanager.provider.project_id or
p.auth_url != oldmanager.provider.auth_url or
p.service_type != oldmanager.provider.service_type or
p.service_name != oldmanager.provider.service_name or
p.region_name != oldmanager.provider.region_name):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
config.provider_managers[p.name] = oldmanager
else:
self.log.debug("Creating new ProviderManager object for %s" %
p.name)
config.provider_managers[p.name] = \
provider_manager.ProviderManager(p)
config.provider_managers[p.name].start()
for t in config.targets.values():
oldmanager = None
if self.config:
oldmanager = self.config.jenkins_managers.get(t.name)
if oldmanager:
if (t.jenkins_url != oldmanager.target.jenkins_url or
t.jenkins_user != oldmanager.target.jenkins_user or
t.jenkins_apikey != oldmanager.target.jenkins_apikey):
stop_managers.append(oldmanager)
oldmanager = None
if oldmanager:
config.jenkins_managers[t.name] = oldmanager
else:
self.log.debug("Creating new JenkinsManager object for %s" %
t.name)
config.jenkins_managers[t.name] = \
jenkins_manager.JenkinsManager(t)
config.jenkins_managers[t.name].start()
for oldmanager in stop_managers:
oldmanager.stop()
if self.config.dburi != self.dburi:
self.dburi = self.config.dburi
self.db = nodedb.NodeDatabase(self.config.dburi)
self.startUpdateListeners(config['zmq-publishers'])
def reconfigureCrons(self, config):
cron_map = {
'image-update': self._doUpdateImages,
'cleanup': self._doPeriodicCleanup,
'check': self._doPeriodicCheck,
}
if not self.apsched:
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
for c in config.crons.values():
if ((not self.config) or
c.timespec != self.config.crons[c.name].timespec):
if self.config and self.config.crons[c.name].job:
self.apsched.unschedule_job(self.config.crons[c.name].job)
parts = c.timespec.split()
minute, hour, dom, month, dow = parts[:5]
c.job = self.apsched.add_cron_job(
cron_map[c.name],
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
else:
c.job = self.config.crons[c.name].job
def reconfigureUpdateListeners(self, config):
if self.config:
running = set(self.config.zmq_publishers.keys())
else:
running = set()
configured = set(config.zmq_publishers.keys())
if running == configured:
self.log.debug("ZMQ Listeners do not need to be updated")
config.zmq_publishers = self.config.zmq_publishers
return
if self.zmq_context:
self.log.debug("Stopping listeners")
self.zmq_context.destroy()
self.zmq_context = zmq.Context()
for z in config.zmq_publishers.values():
self.log.debug("Starting listener for %s" % z.name)
z.listener = NodeUpdateListener(self, z.name)
z.listener.start()
def setConfig(self, config):
self.config = config
def getDB(self):
return self.config.db
def getProviderManager(self, provider):
return self.config.provider_managers[provider.name]
@ -649,24 +692,6 @@ class NodePool(threading.Thread):
def getJenkinsManager(self, target):
return self.config.jenkins_managers[target.name]
def startUpdateListeners(self, publishers):
running = set(self.zmq_listeners.keys())
configured = set(publishers)
if running == configured:
self.log.debug("Listeners do not need to be updated")
return
if self.zmq_context:
self.log.debug("Stopping listeners")
self.zmq_context.destroy()
self.zmq_listeners = {}
self.zmq_context = zmq.Context()
for addr in publishers:
self.log.debug("Starting listener for %s" % addr)
listener = NodeUpdateListener(self, addr)
self.zmq_listeners[addr] = listener
listener.start()
def getNumNeededNodes(self, session, target, provider, image):
# Count machines that are ready and machines that are building,
# so that if the provider is very slow, we aren't queueing up tons
@ -692,8 +717,14 @@ class NodePool(threading.Thread):
def run(self):
while not self._stopped:
try:
self.loadConfig()
with self.db.getSession() as session:
config = self.loadConfig()
self.reconfigureDatabase(config)
self.reconfigureManagers(config)
self.reconfigureCrons(config)
self.reconfigureUpdateListeners(config)
self.setConfig(config)
with self.getDB().getSession() as session:
self._run(session)
except Exception:
self.log.exception("Exception in main loop:")
@ -744,7 +775,7 @@ class NodePool(threading.Thread):
def _doUpdateImages(self):
try:
with self.db.getSession() as session:
with self.getDB().getSession() as session:
self.updateImages(session)
except Exception:
self.log.exception("Exception in periodic image update:")
@ -839,7 +870,7 @@ class NodePool(threading.Thread):
def _doPeriodicCleanup(self):
try:
with self.db.getSession() as session:
with self.getDB().getSession() as session:
self.periodicCleanup(session)
except Exception:
self.log.exception("Exception in periodic cleanup:")
@ -913,7 +944,7 @@ class NodePool(threading.Thread):
def _doPeriodicCheck(self):
try:
with self.db.getSession() as session:
with self.getDB().getSession() as session:
self.periodicCheck(session)
except Exception:
self.log.exception("Exception in periodic chack:")

View File

@ -13,3 +13,4 @@ sqlalchemy>=0.8.2,<0.9.0
pyzmq>=13.1.0,<14.0.0
python-novaclient
MySQL-python
PrettyTable>=0.6,<0.8

View File

@ -21,7 +21,8 @@ warnerrors = True
[entry_points]
console_scripts =
nodepool = nodepool.cmd.nodepoold:main
nodepool = nodepool.cmd.nodepoolcmd:main
nodepoold = nodepool.cmd.nodepoold:main
[build_sphinx]
source-dir = doc/source