Merge "Use the SQLAlchemy ORM"

This commit is contained in:
Zuul 2018-11-28 21:28:11 +00:00 committed by Gerrit Code Review
commit 7ce90a8a7f
3 changed files with 181 additions and 135 deletions

View File

@ -19,9 +19,8 @@ import alembic.command
import alembic.config
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy import orm
import sqlalchemy.pool
from sqlalchemy.sql import select
import voluptuous
from zuul.connection import BaseConnection
@ -30,6 +29,75 @@ BUILDSET_TABLE = 'zuul_buildset'
BUILD_TABLE = 'zuul_build'
class DatabaseSession(object):
def __init__(self, connection):
self.connection = connection
self.session = connection.session
def __enter__(self):
return self
def __exit__(self, etype, value, tb):
if etype:
self.session().rollback()
else:
self.session().commit()
self.session().close()
self.session = None
def listFilter(self, query, column, value):
if value is None:
return query
if isinstance(value, list):
return query.filter(column.in_(value))
return query.filter(column == value)
def getBuilds(self, tenant=None, project=None, pipeline=None,
change=None, branch=None, patchset=None, ref=None,
newrev=None, uuid=None, job_name=None, voting=None,
node_name=None, result=None, limit=50, offset=0):
build_table = self.connection.zuul_build_table
buildset_table = self.connection.zuul_buildset_table
# contains_eager allows us to perform eager loading on the
# buildset *and* use that table in filters (unlike
# joinedload).
q = self.session().query(self.connection.buildModel).\
join(self.connection.buildSetModel).\
options(orm.contains_eager(self.connection.buildModel.buildset)).\
with_hint(build_table, 'USE INDEX (PRIMARY)', 'mysql')
q = self.listFilter(q, buildset_table.c.tenant, tenant)
q = self.listFilter(q, buildset_table.c.project, project)
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)
q = self.listFilter(q, buildset_table.c.change, change)
q = self.listFilter(q, buildset_table.c.branch, branch)
q = self.listFilter(q, buildset_table.c.patchset, patchset)
q = self.listFilter(q, buildset_table.c.ref, ref)
q = self.listFilter(q, buildset_table.c.newrev, newrev)
q = self.listFilter(q, build_table.c.uuid, uuid)
q = self.listFilter(q, build_table.c.job_name, job_name)
q = self.listFilter(q, build_table.c.voting, voting)
q = self.listFilter(q, build_table.c.node_name, node_name)
q = self.listFilter(q, build_table.c.result, result)
q = q.order_by(build_table.c.id.desc()).\
limit(limit).\
offset(offset)
try:
return q.all()
except sqlalchemy.orm.exc.NoResultFound:
return []
def createBuildSet(self, *args, **kw):
bs = self.connection.buildSetModel(*args, **kw)
self.session().add(bs)
self.session().flush()
return bs
class SQLConnection(BaseConnection):
driver_name = 'sql'
log = logging.getLogger("zuul.SQLConnection")
@ -58,6 +126,18 @@ class SQLConnection(BaseConnection):
poolclass=sqlalchemy.pool.QueuePool,
pool_recycle=self.connection_config.get('pool_recycle', 1))
self._migrate()
# If we want the objects returned from query() to be
# usable outside of the session, we need to expunge them
# from the session, and since the DatabaseSession always
# calls commit() on the session when the context manager
# exits, we need to inform the session not to expire
# objects when it does so.
self.session_factory = orm.sessionmaker(bind=self.engine,
expire_on_commit=False,
autoflush=False)
self.session = orm.scoped_session(self.session_factory)
self.tables_established = True
except sa.exc.NoSuchModuleError:
self.log.exception(
@ -68,6 +148,9 @@ class SQLConnection(BaseConnection):
"Unable to connect to the database or establish the required "
"tables. Reporter %s is disabled" % self)
def getSession(self):
return DatabaseSession(self)
def _migrate(self):
"""Perform the alembic migrations for this connection"""
with self.engine.begin() as conn:
@ -89,24 +172,9 @@ class SQLConnection(BaseConnection):
def _setup_models(self):
Base = declarative_base(metadata=sa.MetaData())
class BuildModel(Base):
__tablename__ = self.table_prefix + BUILD_TABLE
id = sa.Column(sa.Integer, primary_key=True)
buildset_id = sa.Column(sa.String, sa.ForeignKey(
self.table_prefix + BUILDSET_TABLE + ".id"))
uuid = sa.Column(sa.String(36))
job_name = sa.Column(sa.String(255))
result = sa.Column(sa.String(255))
start_time = sa.Column(sa.DateTime)
end_time = sa.Column(sa.DateTime)
voting = sa.Column(sa.Boolean)
log_url = sa.Column(sa.String(255))
node_name = sa.Column(sa.String(255))
class BuildSetModel(Base):
__tablename__ = self.table_prefix + BUILDSET_TABLE
id = sa.Column(sa.Integer, primary_key=True)
builds = relationship(BuildModel, lazy="subquery")
zuul_ref = sa.Column(sa.String(255))
pipeline = sa.Column(sa.String(255))
project = sa.Column(sa.String(255))
@ -121,6 +189,30 @@ class SQLConnection(BaseConnection):
message = sa.Column(sa.TEXT())
tenant = sa.Column(sa.String(255))
def createBuild(self, *args, **kw):
session = orm.session.Session.object_session(self)
b = BuildModel(*args, **kw)
b.buildset_id = self.id
self.builds.append(b)
session.add(b)
session.flush()
return b
class BuildModel(Base):
__tablename__ = self.table_prefix + BUILD_TABLE
id = sa.Column(sa.Integer, primary_key=True)
buildset_id = sa.Column(sa.String, sa.ForeignKey(
self.table_prefix + BUILDSET_TABLE + ".id"))
uuid = sa.Column(sa.String(36))
job_name = sa.Column(sa.String(255))
result = sa.Column(sa.String(255))
start_time = sa.Column(sa.DateTime)
end_time = sa.Column(sa.DateTime)
voting = sa.Column(sa.Boolean)
log_url = sa.Column(sa.String(255))
node_name = sa.Column(sa.String(255))
buildset = orm.relationship(BuildSetModel, backref="builds")
self.buildModel = BuildModel
self.buildSetModel = BuildSetModel
return self.buildSetModel.__table__, self.buildModel.__table__
@ -129,58 +221,10 @@ class SQLConnection(BaseConnection):
self.log.debug("Stopping SQL connection %s" % self.connection_name)
self.engine.dispose()
def query(self, args):
build = self.zuul_build_table
buildset = self.zuul_buildset_table
query = select([
buildset.c.project,
buildset.c.branch,
buildset.c.pipeline,
buildset.c.change,
buildset.c.patchset,
buildset.c.ref,
buildset.c.newrev,
buildset.c.ref_url,
build.c.result,
build.c.uuid,
build.c.job_name,
build.c.voting,
build.c.node_name,
build.c.start_time,
build.c.end_time,
build.c.log_url]).select_from(build.join(buildset))
for table in ('build', 'buildset'):
for key, val in args['%s_filters' % table].items():
if table == 'build':
column = build.c
else:
column = buildset.c
query = query.where(getattr(column, key).in_(val))
return query.\
limit(args['limit']).\
offset(args['skip']).\
order_by(build.c.id.desc()).\
with_hint(build, 'USE INDEX (PRIMARY)', 'mysql')
def get_builds(self, args):
"""Return a list of build"""
builds = []
with self.engine.begin() as conn:
for row in conn.execute(self.query(args)):
build = dict(row)
# Convert date to iso format
if row.start_time:
build['start_time'] = row.start_time.strftime(
'%Y-%m-%dT%H:%M:%S')
if row.end_time:
build['end_time'] = row.end_time.strftime(
'%Y-%m-%dT%H:%M:%S')
# Compute run duration
if row.start_time and row.end_time:
build['duration'] = (row.end_time -
row.start_time).total_seconds()
builds.append(build)
return builds
def getBuilds(self, *args, **kw):
"""Return a list of Build objects"""
with self.getSession() as db:
return db.getBuilds(*args, **kw)
def getSchema():

View File

@ -32,42 +32,31 @@ class SQLReporter(BaseReporter):
self.log.warn("SQL reporter (%s) is disabled " % self)
return
with self.connection.engine.begin() as conn:
change = getattr(item.change, 'number', None)
patchset = getattr(item.change, 'patchset', None)
ref = getattr(item.change, 'ref', '')
oldrev = getattr(item.change, 'oldrev', '')
newrev = getattr(item.change, 'newrev', '')
branch = getattr(item.change, 'branch', '')
buildset_ins = self.connection.zuul_buildset_table.insert().values(
zuul_ref=item.current_build_set.ref,
with self.connection.getSession() as db:
db_buildset = db.createBuildSet(
tenant=item.pipeline.tenant.name,
pipeline=item.pipeline.name,
project=item.change.project.name,
change=change,
patchset=patchset,
ref=ref,
oldrev=oldrev,
newrev=newrev,
change=getattr(item.change, 'number', None),
patchset=getattr(item.change, 'patchset', None),
ref=getattr(item.change, 'ref', ''),
oldrev=getattr(item.change, 'oldrev', ''),
newrev=getattr(item.change, 'newrev', ''),
branch=getattr(item.change, 'branch', ''),
zuul_ref=item.current_build_set.ref,
ref_url=item.change.url,
result=item.current_build_set.result,
message=self._formatItemReport(
item, with_jobs=False),
tenant=item.pipeline.tenant.name,
branch=branch,
message=self._formatItemReport(item, with_jobs=False),
)
buildset_ins_result = conn.execute(buildset_ins)
build_inserts = []
for job in item.getJobs():
build = item.current_build_set.getBuild(job.name)
if not build:
# build hasn't began. The sql reporter can only send back
# build hasn't begun. The sql reporter can only send back
# stats about builds. It doesn't understand how to store
# information about the change.
continue
(result, url) = item.formatJobResult(job)
start = end = None
if build.start_time:
start = datetime.datetime.fromtimestamp(
@ -78,19 +67,16 @@ class SQLReporter(BaseReporter):
build.end_time,
tz=datetime.timezone.utc)
build_inserts.append({
'buildset_id': buildset_ins_result.inserted_primary_key[0],
'uuid': build.uuid,
'job_name': build.job.name,
'result': result,
'start_time': start,
'end_time': end,
'voting': build.job.voting,
'log_url': url,
'node_name': build.node_name,
})
conn.execute(self.connection.zuul_build_table.insert(),
build_inserts)
db_buildset.createBuild(
uuid=build.uuid,
job_name=build.job.name,
result=result,
start_time=start,
end_time=end,
voting=build.job.voting,
log_url=url,
node_name=build.node_name,
)
def getSchema():

View File

@ -374,6 +374,44 @@ class ZuulWebAPI(object):
resp.headers['Content-Type'] = 'text/plain'
return job.data[0] + '\n'
def buildToDict(self, build):
start_time = build.start_time
if build.start_time:
start_time = start_time.strftime(
'%Y-%m-%dT%H:%M:%S')
end_time = build.end_time
if build.end_time:
end_time = end_time.strftime(
'%Y-%m-%dT%H:%M:%S')
if build.start_time and build.end_time:
duration = (build.end_time -
build.start_time).total_seconds()
else:
duration = None
buildset = build.buildset
ret = {
'uuid': build.uuid,
'job_name': build.job_name,
'result': build.result,
'start_time': start_time,
'end_time': end_time,
'duration': duration,
'voting': build.voting,
'log_url': build.log_url,
'node_name': build.node_name,
'project': buildset.project,
'branch': buildset.branch,
'pipeline': buildset.pipeline,
'change': buildset.change,
'patchset': buildset.patchset,
'ref': buildset.ref,
'newrev': buildset.newrev,
'ref_url': buildset.ref_url,
}
return ret
@cherrypy.expose
@cherrypy.tools.save_params()
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
@ -391,31 +429,15 @@ class ZuulWebAPI(object):
connection = self.zuulweb.connections.connections[connection_name]
args = {
'buildset_filters': {'tenant': [tenant]},
'build_filters': {},
'limit': limit,
'skip': skip,
}
builds = connection.getBuilds(
tenant=tenant, project=project, pipeline=pipeline, change=change,
branch=branch, patchset=patchset, ref=ref, newrev=newrev,
uuid=uuid, job_name=job_name, voting=voting, node_name=node_name,
result=result, limit=limit, offset=skip)
for k in ("project", "pipeline", "change", "branch",
"patchset", "ref", "newrev"):
v = locals()[k]
if v:
if not isinstance(v, list):
v = [v]
args['buildset_filters'].setdefault(k, []).extend(v)
for k in ("uuid", "job_name", "voting", "node_name",
"result"):
v = locals()[k]
if v:
if not isinstance(v, list):
v = [v]
args['build_filters'].setdefault(k, []).extend(v)
data = connection.get_builds(args)
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
return data
return [self.buildToDict(b) for b in builds]
@cherrypy.expose
@cherrypy.tools.save_params()
@ -431,16 +453,10 @@ class ZuulWebAPI(object):
connection = self.zuulweb.connections.connections[connection_name]
args = {
'buildset_filters': {'tenant': [tenant]},
'build_filters': {'uuid': [uuid]},
'limit': 1,
'skip': 0,
}
data = connection.get_builds(args)
data = connection.getBuilds(tenant=tenant, uuid=uuid, limit=1)
if not data:
raise cherrypy.HTTPError(404, "Build not found")
data = data[0]
data = self.buildToDict(data[0])
resp = cherrypy.response
resp.headers['Access-Control-Allow-Origin'] = '*'
return data