Merge "Add web-based console log streaming" into feature/zuulv3

This commit is contained in:
Jenkins 2017-07-10 20:09:21 +00:00 committed by Gerrit Code Review
commit e128b517d0
12 changed files with 548 additions and 19 deletions

View File

@ -33,6 +33,10 @@ default_username=zuul
trusted_ro_dirs=/opt/zuul-scripts:/var/cache
trusted_rw_dirs=/opt/zuul-logs
[web]
listen_address=127.0.0.1
port=9000
[webapp]
listen_address=0.0.0.0
port=8001

View File

@ -24,3 +24,5 @@ cryptography>=1.6
cachecontrol
pyjwt
iso8601
aiohttp
uvloop;python_version>='3.5'

View File

@ -26,6 +26,7 @@ console_scripts =
zuul-cloner = zuul.cmd.cloner:main
zuul-executor = zuul.cmd.executor:main
zuul-bwrap = zuul.driver.bubblewrap:main
zuul-web = zuul.cmd.web:main
[build_sphinx]
source-dir = doc/source

View File

@ -1266,7 +1266,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.build_history = []
self.fail_tests = {}
self.job_builds = {}
self.hostname = 'zl.example.com'
def failJob(self, name, change):
"""Instruct the executor to report matching builds as failures.

View File

@ -121,7 +121,8 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
self.assertEqual(
'finger://zl.example.com/{uuid}'.format(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=buildset0_builds[0]['uuid']),
buildset0_builds[0]['log_url'])
self.assertEqual('check', buildset1['pipeline'])
@ -144,7 +145,8 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
self.assertEqual(
'finger://zl.example.com/{uuid}'.format(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=buildset1_builds[-2]['uuid']),
buildset1_builds[-2]['log_url'])

View File

@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import aiohttp
import asyncio
import logging
import json
import os
import os.path
import socket
@ -21,6 +25,7 @@ import tempfile
import threading
import time
import zuul.web
import zuul.lib.log_streamer
import tests.base
@ -57,6 +62,7 @@ class TestLogStreamer(tests.base.BaseTestCase):
class TestStreaming(tests.base.AnsibleZuulTestCase):
tenant_config_file = 'config/streamer/main.yaml'
log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
def setUp(self):
super(TestStreaming, self).setUp()
@ -146,9 +152,116 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
# job and deleted. However, we still have a file handle to it, so we
# can make sure that we read the entire contents at this point.
# Compact the returned lines into a single string for easy comparison.
file_contents = ''.join(logfile.readlines())
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
self.assertEqual(file_contents, self.streaming_data)
def runWSClient(self, build_uuid, event):
async def client(loop, build_uuid, event):
uri = 'http://127.0.0.1:9000/console-stream'
try:
session = aiohttp.ClientSession(loop=loop)
async with session.ws_connect(uri) as ws:
req = {'uuid': build_uuid, 'logfile': None}
ws.send_str(json.dumps(req))
event.set() # notify we are connected and req sent
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
self.ws_client_results += msg.data
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
session.close()
except Exception as e:
self.log.exception("client exception:")
loop = asyncio.new_event_loop()
loop.set_debug(True)
loop.run_until_complete(client(loop, build_uuid, event))
loop.close()
def test_websocket_streaming(self):
# Need to set the streaming port before submitting the job
finger_port = 7902
self.executor_server.log_streaming_port = finger_port
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
# We don't have any real synchronization for the ansible jobs, so
# just wait until we get our running build.
while not len(self.builds):
time.sleep(0.1)
build = self.builds[0]
self.assertEqual(build.name, 'python27')
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
while not os.path.exists(build_dir):
time.sleep(0.1)
# Need to wait to make sure that jobdir gets set
while build.jobdir is None:
time.sleep(0.1)
build = self.builds[0]
# Wait for the job to begin running and create the ansible log file.
# The job waits to complete until the flag file exists, so we can
# safely access the log here. We only open it (to force a file handle
# to be kept open for it after the job finishes) but wait to read the
# contents until the job is done.
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
while not os.path.exists(ansible_log):
time.sleep(0.1)
logfile = open(ansible_log, 'r')
self.addCleanup(logfile.close)
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(
None, self.host, finger_port, self.executor_server.jobdir_root)
self.addCleanup(streamer.stop)
# Start the web server
web_server = zuul.web.ZuulWeb(
listen_address='127.0.0.1', listen_port=9000,
gear_server='127.0.0.1', gear_port=self.gearman_server.port)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
self.addCleanup(web_server.stop)
# Wait until web server is started
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
while s.connect_ex((self.host, 9000)):
time.sleep(0.1)
# Start a thread with the websocket client
ws_client_event = threading.Event()
self.ws_client_results = ''
ws_client_thread = threading.Thread(
target=self.runWSClient, args=(build.uuid, ws_client_event)
)
ws_client_thread.start()
ws_client_event.wait()
# Allow the job to complete
flag_file = os.path.join(build_dir, 'test_wait')
open(flag_file, 'w').close()
# Wait for the websocket client to complete, which it should when
# it's received the full log.
ws_client_thread.join()
self.waitUntilSettled()
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
self.assertEqual(file_contents, self.ws_client_results)

View File

@ -2289,22 +2289,40 @@ class TestScheduler(ZuulTestCase):
status_jobs.append(job)
self.assertEqual('project-merge', status_jobs[0]['name'])
# TODO(mordred) pull uuids from self.builds
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
status_jobs[0]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[0]['uuid']),
status_jobs[0]['url'])
# TOOD(mordred) configure a success-url on the base job
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
status_jobs[0]['report_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[0]['uuid']),
status_jobs[0]['report_url'])
self.assertEqual('project-test1', status_jobs[1]['name'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
status_jobs[1]['url'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
status_jobs[1]['report_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[1]['uuid']),
status_jobs[1]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[1]['uuid']),
status_jobs[1]['report_url'])
self.assertEqual('project-test2', status_jobs[2]['name'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
status_jobs[2]['url'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
status_jobs[2]['report_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[2]['uuid']),
status_jobs[2]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[2]['uuid']),
status_jobs[2]['report_url'])
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
@ -3577,8 +3595,11 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual('project-merge', job['name'])
self.assertEqual('gate', job['pipeline'])
self.assertEqual(False, job['retry'])
self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
job['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=job['uuid']),
job['url'])
self.assertEqual(2, len(job['worker']))
self.assertEqual(False, job['canceled'])
self.assertEqual(True, job['voting'])
@ -4674,7 +4695,8 @@ class TestSchedulerSuccessURL(ZuulTestCase):
# NOTE: This default URL is currently hard-coded in executor/server.py
self.assertIn(
'- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
'- docs-draft-test2 finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=uuid_test2),
body[3])

115
zuul/cmd/web.py Executable file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import asyncio
import daemon
import extras
import logging
import signal
import sys
import threading
import zuul.cmd
import zuul.web
from zuul.lib.config import get_default
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
# instead it depends on lockfile-0.9.1 which uses pidfile.
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
class WebServer(zuul.cmd.ZuulApp):
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Zuul Web Server.')
parser.add_argument('-c', dest='config',
help='specify the config file')
parser.add_argument('-d', dest='nodaemon', action='store_true',
help='do not run as a daemon')
parser.add_argument('--version', dest='version', action='version',
version=self._get_version(),
help='show zuul version')
self.args = parser.parse_args()
def exit_handler(self, signum, frame):
self.web.stop()
def _main(self):
params = dict()
params['listen_address'] = get_default(self.config,
'web', 'listen_address',
'127.0.0.1')
params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
params['gear_server'] = get_default(self.config, 'gearman', 'server')
params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
try:
self.web = zuul.web.ZuulWeb(**params)
except Exception as e:
self.log.exception("Error creating ZuulWeb:")
sys.exit(1)
loop = asyncio.get_event_loop()
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.log.info('Zuul Web Server starting')
self.thread = threading.Thread(target=self.web.run,
args=(loop,),
name='web')
self.thread.start()
try:
signal.pause()
except KeyboardInterrupt:
print("Ctrl + C: asking web server to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
self.thread.join()
loop.stop()
loop.close()
self.log.info("Zuul Web Server stopped")
def main(self):
self.setup_logging('web', 'log_config')
self.log = logging.getLogger("zuul.WebServer")
try:
self._main()
except Exception:
self.log.exception("Exception from WebServer:")
def main():
server = WebServer()
server.parse_arguments()
server.read_config()
pid_fn = get_default(server.config, 'web', 'pidfile',
'/var/run/zuul-web/zuul-web.pid', expand_user=True)
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
server.main()
else:
with daemon.DaemonContext(pidfile=pid):
server.main()

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import os.path
import pwd
@ -212,6 +213,8 @@ class LogStreamer(object):
'''
def __init__(self, user, host, port, jobdir_root):
self.log = logging.getLogger('zuul.lib.LogStreamer')
self.log.debug("LogStreamer starting on port %s", port)
self.server = CustomForkingTCPServer((host, port),
RequestHandler,
user=user,
@ -227,3 +230,4 @@ class LogStreamer(object):
if self.thd.isAlive():
self.server.shutdown()
self.server.server_close()
self.log.debug("LogStreamer stopped")

View File

@ -86,3 +86,11 @@ class RPCClient(object):
def shutdown(self):
self.gearman.shutdown()
def get_job_log_stream_address(self, uuid, logfile='console.log'):
data = {'uuid': uuid, 'logfile': logfile}
job = self.submitJob('zuul:get_job_log_stream_address', data)
if job.failure:
return False
else:
return json.loads(job.data[0])

View File

@ -53,6 +53,7 @@ class RPCListener(object):
self.worker.registerFunction("zuul:enqueue_ref")
self.worker.registerFunction("zuul:promote")
self.worker.registerFunction("zuul:get_running_jobs")
self.worker.registerFunction("zuul:get_job_log_stream_address")
def stop(self):
self.log.debug("Stopping")
@ -173,3 +174,29 @@ class RPCListener(object):
running_items.append(item.formatJSON())
job.sendWorkComplete(json.dumps(running_items))
def handle_get_job_log_stream_address(self, job):
# TODO: map log files to ports. Currently there is only one
# log stream for a given job. But many jobs produce many
# log files, so this is forwards compatible with a future
# where there are more logs to potentially request than
# "console.log"
def find_build(uuid):
for tenant in self.sched.abide.tenants.values():
for pipeline_name, pipeline in tenant.layout.pipelines.items():
for queue in pipeline.queues:
for item in queue.queue:
for bld in item.current_build_set.getBuilds():
if bld.uuid == uuid:
return bld
return None
args = json.loads(job.arguments)
uuid = args['uuid']
# TODO: logfile = args['logfile']
job_log_stream_address = {}
build = find_build(uuid)
if build:
job_log_stream_address['server'] = build.worker.hostname
job_log_stream_address['port'] = build.worker.log_port
job.sendWorkComplete(json.dumps(job_log_stream_address))

232
zuul/web.py Normal file
View File

@ -0,0 +1,232 @@
#!/usr/bin/env python
# Copyright (c) 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
import uvloop
import aiohttp
from aiohttp import web
import zuul.rpcclient
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
def __init__(self, loop, gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.event_loop = loop
self.gear_server = gear_server
self.gear_port = gear_port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
def _getPortLocation(self, job_uuid):
'''
Query Gearman for the executor running the given job.
:param str job_uuid: The job UUID we want to stream.
'''
# TODO: Fetch the entire list of uuid/file/server/ports once and
# share that, and fetch a new list on cache misses perhaps?
# TODO: Avoid recreating a client for each request.
rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
self.ssl_key, self.ssl_cert,
self.ssl_ca)
ret = rpc.get_job_log_stream_address(job_uuid)
rpc.shutdown()
return ret
async def _fingerClient(self, ws, server, port, job_uuid):
'''
Create a client to connect to the finger streamer and pull results.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str job_uuid: The job UUID to stream.
'''
self.log.debug("Connecting to finger server %s:%s", server, port)
reader, writer = await asyncio.open_connection(host=server, port=port,
loop=self.event_loop)
self.log.debug("Sending finger request for %s", job_uuid)
msg = "%s\n" % job_uuid # Must have a trailing newline!
writer.write(msg.encode('utf8'))
await writer.drain()
while True:
data = await reader.read(1024)
if data:
await ws.send_str(data.decode('utf8'))
else:
writer.close()
return
async def _streamLog(self, ws, request):
'''
Stream the log for the requested job back to the client.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param dict request: The client request parameters.
'''
for key in ('uuid', 'logfile'):
if key not in request:
return (4000, "'{key}' missing from request payload".format(
key=key))
# Schedule the blocking gearman work in an Executor
gear_task = self.event_loop.run_in_executor(
None, self._getPortLocation, request['uuid'])
try:
port_location = await asyncio.wait_for(gear_task, 10)
except asyncio.TimeoutError:
return (4010, "Gearman timeout")
if not port_location:
return (4011, "Error with Gearman")
await self._fingerClient(
ws, port_location['server'], port_location['port'], request['uuid']
)
return (1000, "No more data")
async def processRequest(self, request):
'''
Handle a client websocket request for log streaming.
:param aiohttp.web.Request request: The client request.
'''
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
req = json.loads(msg.data)
self.log.debug("Websocket request: %s", req)
code, msg = await self._streamLog(ws, req)
# We expect to process only a single message. I.e., we
# can stream only a single file at a time.
await ws.close(code=code, message=msg)
break
elif msg.type == aiohttp.WSMsgType.ERROR:
self.log.error(
"Websocket connection closed with exception %s",
ws.exception()
)
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
except Exception as e:
self.log.exception("Websocket exception:")
await ws.close(code=4009, message=str(e).encode('utf-8'))
return ws
class ZuulWeb(object):
log = logging.getLogger("zuul.web.ZuulWeb")
def __init__(self, listen_address, listen_port,
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.listen_address = listen_address
self.listen_port = listen_port
self.gear_server = gear_server
self.gear_port = gear_port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
async def _handleWebsocket(self, request):
handler = LogStreamingHandler(self.event_loop,
self.gear_server, self.gear_port,
self.ssl_key, self.ssl_cert, self.ssl_ca)
return await handler.processRequest(request)
def run(self, loop=None):
'''
Run the websocket daemon.
Because this method can be the target of a new thread, we need to
set the thread event loop here, rather than in __init__().
:param loop: The event loop to use. If not supplied, the default main
thread event loop is used. This should be supplied if ZuulWeb
is run within a separate (non-main) thread.
'''
routes = [
('GET', '/console-stream', self._handleWebsocket)
]
self.log.debug("ZuulWeb starting")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
user_supplied_loop = loop is not None
if not loop:
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
self.event_loop = loop
app = web.Application()
for method, path, handler in routes:
app.router.add_route(method, path, handler)
handler = app.make_handler(loop=self.event_loop)
# create the server
coro = self.event_loop.create_server(handler,
self.listen_address,
self.listen_port)
self.server = self.event_loop.run_until_complete(coro)
self.term = asyncio.Future()
# start the server
self.event_loop.run_until_complete(self.term)
# cleanup
self.log.debug("ZuulWeb stopping")
self.server.close()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.run_until_complete(app.shutdown())
self.event_loop.run_until_complete(handler.shutdown(60.0))
self.event_loop.run_until_complete(app.cleanup())
self.log.debug("ZuulWeb stopped")
# Only run these if we are controlling the loop - they need to be
# run from the main thread
if not user_supplied_loop:
loop.stop()
loop.close()
def stop(self):
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
z = ZuulWeb()
z.run(loop)