Use the SQLAlchemy ORM
We used the ORM to declare the table structure, but just performed raw queries. As a step to adding more tables and more complex relationships, fully utilize the ORM in queries. Change-Id: I260095d908dc9bea03cf825f4cc8aae8ae43ba16
This commit is contained in:
parent
3476bb8788
commit
3b82885e5c
|
@ -19,9 +19,8 @@ import alembic.command
|
|||
import alembic.config
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy import orm
|
||||
import sqlalchemy.pool
|
||||
from sqlalchemy.sql import select
|
||||
import voluptuous
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
|
@ -30,6 +29,75 @@ BUILDSET_TABLE = 'zuul_buildset'
|
|||
BUILD_TABLE = 'zuul_build'
|
||||
|
||||
|
||||
class DatabaseSession(object):
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
self.session = connection.session
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
if etype:
|
||||
self.session().rollback()
|
||||
else:
|
||||
self.session().commit()
|
||||
self.session().close()
|
||||
self.session = None
|
||||
|
||||
def listFilter(self, query, column, value):
|
||||
if value is None:
|
||||
return query
|
||||
if isinstance(value, list):
|
||||
return query.filter(column.in_(value))
|
||||
return query.filter(column == value)
|
||||
|
||||
def getBuilds(self, tenant=None, project=None, pipeline=None,
|
||||
change=None, branch=None, patchset=None, ref=None,
|
||||
newrev=None, uuid=None, job_name=None, voting=None,
|
||||
node_name=None, result=None, limit=50, offset=0):
|
||||
|
||||
build_table = self.connection.zuul_build_table
|
||||
buildset_table = self.connection.zuul_buildset_table
|
||||
|
||||
# contains_eager allows us to perform eager loading on the
|
||||
# buildset *and* use that table in filters (unlike
|
||||
# joinedload).
|
||||
q = self.session().query(self.connection.buildModel).\
|
||||
join(self.connection.buildSetModel).\
|
||||
options(orm.contains_eager(self.connection.buildModel.buildset)).\
|
||||
with_hint(build_table, 'USE INDEX (PRIMARY)', 'mysql')
|
||||
|
||||
q = self.listFilter(q, buildset_table.c.tenant, tenant)
|
||||
q = self.listFilter(q, buildset_table.c.project, project)
|
||||
q = self.listFilter(q, buildset_table.c.pipeline, pipeline)
|
||||
q = self.listFilter(q, buildset_table.c.change, change)
|
||||
q = self.listFilter(q, buildset_table.c.branch, branch)
|
||||
q = self.listFilter(q, buildset_table.c.patchset, patchset)
|
||||
q = self.listFilter(q, buildset_table.c.ref, ref)
|
||||
q = self.listFilter(q, buildset_table.c.newrev, newrev)
|
||||
q = self.listFilter(q, build_table.c.uuid, uuid)
|
||||
q = self.listFilter(q, build_table.c.job_name, job_name)
|
||||
q = self.listFilter(q, build_table.c.voting, voting)
|
||||
q = self.listFilter(q, build_table.c.node_name, node_name)
|
||||
q = self.listFilter(q, build_table.c.result, result)
|
||||
|
||||
q = q.order_by(build_table.c.id.desc()).\
|
||||
limit(limit).\
|
||||
offset(offset)
|
||||
|
||||
try:
|
||||
return q.all()
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
return []
|
||||
|
||||
def createBuildSet(self, *args, **kw):
|
||||
bs = self.connection.buildSetModel(*args, **kw)
|
||||
self.session().add(bs)
|
||||
self.session().flush()
|
||||
return bs
|
||||
|
||||
|
||||
class SQLConnection(BaseConnection):
|
||||
driver_name = 'sql'
|
||||
log = logging.getLogger("zuul.SQLConnection")
|
||||
|
@ -58,6 +126,18 @@ class SQLConnection(BaseConnection):
|
|||
poolclass=sqlalchemy.pool.QueuePool,
|
||||
pool_recycle=self.connection_config.get('pool_recycle', 1))
|
||||
self._migrate()
|
||||
|
||||
# If we want the objects returned from query() to be
|
||||
# usable outside of the session, we need to expunge them
|
||||
# from the session, and since the DatabaseSession always
|
||||
# calls commit() on the session when the context manager
|
||||
# exits, we need to inform the session not to expire
|
||||
# objects when it does so.
|
||||
self.session_factory = orm.sessionmaker(bind=self.engine,
|
||||
expire_on_commit=False,
|
||||
autoflush=False)
|
||||
self.session = orm.scoped_session(self.session_factory)
|
||||
|
||||
self.tables_established = True
|
||||
except sa.exc.NoSuchModuleError:
|
||||
self.log.exception(
|
||||
|
@ -68,6 +148,9 @@ class SQLConnection(BaseConnection):
|
|||
"Unable to connect to the database or establish the required "
|
||||
"tables. Reporter %s is disabled" % self)
|
||||
|
||||
def getSession(self):
|
||||
return DatabaseSession(self)
|
||||
|
||||
def _migrate(self):
|
||||
"""Perform the alembic migrations for this connection"""
|
||||
with self.engine.begin() as conn:
|
||||
|
@ -89,24 +172,9 @@ class SQLConnection(BaseConnection):
|
|||
def _setup_models(self):
|
||||
Base = declarative_base(metadata=sa.MetaData())
|
||||
|
||||
class BuildModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILD_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
buildset_id = sa.Column(sa.String, sa.ForeignKey(
|
||||
self.table_prefix + BUILDSET_TABLE + ".id"))
|
||||
uuid = sa.Column(sa.String(36))
|
||||
job_name = sa.Column(sa.String(255))
|
||||
result = sa.Column(sa.String(255))
|
||||
start_time = sa.Column(sa.DateTime)
|
||||
end_time = sa.Column(sa.DateTime)
|
||||
voting = sa.Column(sa.Boolean)
|
||||
log_url = sa.Column(sa.String(255))
|
||||
node_name = sa.Column(sa.String(255))
|
||||
|
||||
class BuildSetModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILDSET_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
builds = relationship(BuildModel, lazy="subquery")
|
||||
zuul_ref = sa.Column(sa.String(255))
|
||||
pipeline = sa.Column(sa.String(255))
|
||||
project = sa.Column(sa.String(255))
|
||||
|
@ -121,6 +189,30 @@ class SQLConnection(BaseConnection):
|
|||
message = sa.Column(sa.TEXT())
|
||||
tenant = sa.Column(sa.String(255))
|
||||
|
||||
def createBuild(self, *args, **kw):
|
||||
session = orm.session.Session.object_session(self)
|
||||
b = BuildModel(*args, **kw)
|
||||
b.buildset_id = self.id
|
||||
self.builds.append(b)
|
||||
session.add(b)
|
||||
session.flush()
|
||||
return b
|
||||
|
||||
class BuildModel(Base):
|
||||
__tablename__ = self.table_prefix + BUILD_TABLE
|
||||
id = sa.Column(sa.Integer, primary_key=True)
|
||||
buildset_id = sa.Column(sa.String, sa.ForeignKey(
|
||||
self.table_prefix + BUILDSET_TABLE + ".id"))
|
||||
uuid = sa.Column(sa.String(36))
|
||||
job_name = sa.Column(sa.String(255))
|
||||
result = sa.Column(sa.String(255))
|
||||
start_time = sa.Column(sa.DateTime)
|
||||
end_time = sa.Column(sa.DateTime)
|
||||
voting = sa.Column(sa.Boolean)
|
||||
log_url = sa.Column(sa.String(255))
|
||||
node_name = sa.Column(sa.String(255))
|
||||
buildset = orm.relationship(BuildSetModel, backref="builds")
|
||||
|
||||
self.buildModel = BuildModel
|
||||
self.buildSetModel = BuildSetModel
|
||||
return self.buildSetModel.__table__, self.buildModel.__table__
|
||||
|
@ -129,58 +221,10 @@ class SQLConnection(BaseConnection):
|
|||
self.log.debug("Stopping SQL connection %s" % self.connection_name)
|
||||
self.engine.dispose()
|
||||
|
||||
def query(self, args):
|
||||
build = self.zuul_build_table
|
||||
buildset = self.zuul_buildset_table
|
||||
query = select([
|
||||
buildset.c.project,
|
||||
buildset.c.branch,
|
||||
buildset.c.pipeline,
|
||||
buildset.c.change,
|
||||
buildset.c.patchset,
|
||||
buildset.c.ref,
|
||||
buildset.c.newrev,
|
||||
buildset.c.ref_url,
|
||||
build.c.result,
|
||||
build.c.uuid,
|
||||
build.c.job_name,
|
||||
build.c.voting,
|
||||
build.c.node_name,
|
||||
build.c.start_time,
|
||||
build.c.end_time,
|
||||
build.c.log_url]).select_from(build.join(buildset))
|
||||
for table in ('build', 'buildset'):
|
||||
for key, val in args['%s_filters' % table].items():
|
||||
if table == 'build':
|
||||
column = build.c
|
||||
else:
|
||||
column = buildset.c
|
||||
query = query.where(getattr(column, key).in_(val))
|
||||
return query.\
|
||||
limit(args['limit']).\
|
||||
offset(args['skip']).\
|
||||
order_by(build.c.id.desc()).\
|
||||
with_hint(build, 'USE INDEX (PRIMARY)', 'mysql')
|
||||
|
||||
def get_builds(self, args):
|
||||
"""Return a list of build"""
|
||||
builds = []
|
||||
with self.engine.begin() as conn:
|
||||
for row in conn.execute(self.query(args)):
|
||||
build = dict(row)
|
||||
# Convert date to iso format
|
||||
if row.start_time:
|
||||
build['start_time'] = row.start_time.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')
|
||||
if row.end_time:
|
||||
build['end_time'] = row.end_time.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')
|
||||
# Compute run duration
|
||||
if row.start_time and row.end_time:
|
||||
build['duration'] = (row.end_time -
|
||||
row.start_time).total_seconds()
|
||||
builds.append(build)
|
||||
return builds
|
||||
def getBuilds(self, *args, **kw):
|
||||
"""Return a list of Build objects"""
|
||||
with self.getSession() as db:
|
||||
return db.getBuilds(*args, **kw)
|
||||
|
||||
|
||||
def getSchema():
|
||||
|
|
|
@ -32,42 +32,31 @@ class SQLReporter(BaseReporter):
|
|||
self.log.warn("SQL reporter (%s) is disabled " % self)
|
||||
return
|
||||
|
||||
with self.connection.engine.begin() as conn:
|
||||
change = getattr(item.change, 'number', None)
|
||||
patchset = getattr(item.change, 'patchset', None)
|
||||
ref = getattr(item.change, 'ref', '')
|
||||
oldrev = getattr(item.change, 'oldrev', '')
|
||||
newrev = getattr(item.change, 'newrev', '')
|
||||
branch = getattr(item.change, 'branch', '')
|
||||
buildset_ins = self.connection.zuul_buildset_table.insert().values(
|
||||
zuul_ref=item.current_build_set.ref,
|
||||
with self.connection.getSession() as db:
|
||||
db_buildset = db.createBuildSet(
|
||||
tenant=item.pipeline.tenant.name,
|
||||
pipeline=item.pipeline.name,
|
||||
project=item.change.project.name,
|
||||
change=change,
|
||||
patchset=patchset,
|
||||
ref=ref,
|
||||
oldrev=oldrev,
|
||||
newrev=newrev,
|
||||
change=getattr(item.change, 'number', None),
|
||||
patchset=getattr(item.change, 'patchset', None),
|
||||
ref=getattr(item.change, 'ref', ''),
|
||||
oldrev=getattr(item.change, 'oldrev', ''),
|
||||
newrev=getattr(item.change, 'newrev', ''),
|
||||
branch=getattr(item.change, 'branch', ''),
|
||||
zuul_ref=item.current_build_set.ref,
|
||||
ref_url=item.change.url,
|
||||
result=item.current_build_set.result,
|
||||
message=self._formatItemReport(
|
||||
item, with_jobs=False),
|
||||
tenant=item.pipeline.tenant.name,
|
||||
branch=branch,
|
||||
message=self._formatItemReport(item, with_jobs=False),
|
||||
)
|
||||
buildset_ins_result = conn.execute(buildset_ins)
|
||||
build_inserts = []
|
||||
|
||||
for job in item.getJobs():
|
||||
build = item.current_build_set.getBuild(job.name)
|
||||
if not build:
|
||||
# build hasn't began. The sql reporter can only send back
|
||||
# build hasn't begun. The sql reporter can only send back
|
||||
# stats about builds. It doesn't understand how to store
|
||||
# information about the change.
|
||||
continue
|
||||
|
||||
(result, url) = item.formatJobResult(job)
|
||||
|
||||
start = end = None
|
||||
if build.start_time:
|
||||
start = datetime.datetime.fromtimestamp(
|
||||
|
@ -78,19 +67,16 @@ class SQLReporter(BaseReporter):
|
|||
build.end_time,
|
||||
tz=datetime.timezone.utc)
|
||||
|
||||
build_inserts.append({
|
||||
'buildset_id': buildset_ins_result.inserted_primary_key[0],
|
||||
'uuid': build.uuid,
|
||||
'job_name': build.job.name,
|
||||
'result': result,
|
||||
'start_time': start,
|
||||
'end_time': end,
|
||||
'voting': build.job.voting,
|
||||
'log_url': url,
|
||||
'node_name': build.node_name,
|
||||
})
|
||||
conn.execute(self.connection.zuul_build_table.insert(),
|
||||
build_inserts)
|
||||
db_buildset.createBuild(
|
||||
uuid=build.uuid,
|
||||
job_name=build.job.name,
|
||||
result=result,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
voting=build.job.voting,
|
||||
log_url=url,
|
||||
node_name=build.node_name,
|
||||
)
|
||||
|
||||
|
||||
def getSchema():
|
||||
|
|
|
@ -374,6 +374,44 @@ class ZuulWebAPI(object):
|
|||
resp.headers['Content-Type'] = 'text/plain'
|
||||
return job.data[0] + '\n'
|
||||
|
||||
def buildToDict(self, build):
|
||||
start_time = build.start_time
|
||||
if build.start_time:
|
||||
start_time = start_time.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')
|
||||
end_time = build.end_time
|
||||
if build.end_time:
|
||||
end_time = end_time.strftime(
|
||||
'%Y-%m-%dT%H:%M:%S')
|
||||
if build.start_time and build.end_time:
|
||||
duration = (build.end_time -
|
||||
build.start_time).total_seconds()
|
||||
else:
|
||||
duration = None
|
||||
|
||||
buildset = build.buildset
|
||||
ret = {
|
||||
'uuid': build.uuid,
|
||||
'job_name': build.job_name,
|
||||
'result': build.result,
|
||||
'start_time': start_time,
|
||||
'end_time': end_time,
|
||||
'duration': duration,
|
||||
'voting': build.voting,
|
||||
'log_url': build.log_url,
|
||||
'node_name': build.node_name,
|
||||
|
||||
'project': buildset.project,
|
||||
'branch': buildset.branch,
|
||||
'pipeline': buildset.pipeline,
|
||||
'change': buildset.change,
|
||||
'patchset': buildset.patchset,
|
||||
'ref': buildset.ref,
|
||||
'newrev': buildset.newrev,
|
||||
'ref_url': buildset.ref_url,
|
||||
}
|
||||
return ret
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.save_params()
|
||||
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
|
||||
|
@ -391,31 +429,15 @@ class ZuulWebAPI(object):
|
|||
|
||||
connection = self.zuulweb.connections.connections[connection_name]
|
||||
|
||||
args = {
|
||||
'buildset_filters': {'tenant': [tenant]},
|
||||
'build_filters': {},
|
||||
'limit': limit,
|
||||
'skip': skip,
|
||||
}
|
||||
builds = connection.getBuilds(
|
||||
tenant=tenant, project=project, pipeline=pipeline, change=change,
|
||||
branch=branch, patchset=patchset, ref=ref, newrev=newrev,
|
||||
uuid=uuid, job_name=job_name, voting=voting, node_name=node_name,
|
||||
result=result, limit=limit, offset=skip)
|
||||
|
||||
for k in ("project", "pipeline", "change", "branch",
|
||||
"patchset", "ref", "newrev"):
|
||||
v = locals()[k]
|
||||
if v:
|
||||
if not isinstance(v, list):
|
||||
v = [v]
|
||||
args['buildset_filters'].setdefault(k, []).extend(v)
|
||||
for k in ("uuid", "job_name", "voting", "node_name",
|
||||
"result"):
|
||||
v = locals()[k]
|
||||
if v:
|
||||
if not isinstance(v, list):
|
||||
v = [v]
|
||||
args['build_filters'].setdefault(k, []).extend(v)
|
||||
data = connection.get_builds(args)
|
||||
resp = cherrypy.response
|
||||
resp.headers['Access-Control-Allow-Origin'] = '*'
|
||||
return data
|
||||
return [self.buildToDict(b) for b in builds]
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.save_params()
|
||||
|
@ -431,16 +453,10 @@ class ZuulWebAPI(object):
|
|||
|
||||
connection = self.zuulweb.connections.connections[connection_name]
|
||||
|
||||
args = {
|
||||
'buildset_filters': {'tenant': [tenant]},
|
||||
'build_filters': {'uuid': [uuid]},
|
||||
'limit': 1,
|
||||
'skip': 0,
|
||||
}
|
||||
data = connection.get_builds(args)
|
||||
data = connection.getBuilds(tenant=tenant, uuid=uuid, limit=1)
|
||||
if not data:
|
||||
raise cherrypy.HTTPError(404, "Build not found")
|
||||
data = data[0]
|
||||
data = self.buildToDict(data[0])
|
||||
resp = cherrypy.response
|
||||
resp.headers['Access-Control-Allow-Origin'] = '*'
|
||||
return data
|
||||
|
|
Loading…
Reference in New Issue