Add web-based console log streaming

zuul now provides socket-based console streaming, which is super cool.
In order to have jenkins parity with web streaming, we need to provide a
websocket (javascript in browsers can't really connect to random ports
on servers)

After surveying the existing python websocket options, basically all of
them are based around twisted, eventlet, gevent or asyncio. It's not
just a thing we can easily deal with from our current webob/paste
structure, because it is a change to the fundamental HTTP handling.
While we could write our own websocket server implementation that was
threaded like the rest of zuul, that's a pretty giant amount of work.

Instead, we can run an async-based server that's just for the
websockets, so that we're not all of a sudden putting async code into
the rest of zuul and winding up frankensteined. Since this is new code,
using asyncio and python3 seems like an excellent starting place.

aiohttp supports running a websocket server in a thread. It also
supports doing other HTTP/REST calls, so by going aiohttp we can set
ourselves up for a single answer for the HTTP tier.

In order to keep us from being an open socket relay, we'll expect two
parameters as the first message on the websocket - what's the zuul build
uuid, and what log file do we want to stream. (the second thing,
multiple log files, isn't supported yet by the rest of zuul, but one can
imagine a future where we'd like to support that too, so it's in the
protocol) The websocket server will then ask zuul over gearman for the
IP and port associated with the build and logfile and will start
streaming it to the socket.

Ultimately we'll want the status page to make links of the form:

  /console.html?uuid=<uuid>&logfile=console.log

and we'll want to have apache map the websocket server to something like
/console.

Co-Authored-By: Monty Taylor <mordred@inaugust.com>

Change-Id: Idd0d3f9259e81fa9a60d7540664ce8d5ad2c298f
This commit is contained in:
Monty Taylor 2016-05-24 11:28:10 -05:00 committed by David Shrewsbury
parent 0dbe15993a
commit 51139a0682
12 changed files with 548 additions and 19 deletions

View File

@ -29,6 +29,10 @@ default_username=zuul
trusted_ro_dirs=/opt/zuul-scripts:/var/cache
trusted_rw_dirs=/opt/zuul-logs
[web]
listen_address=127.0.0.1
port=9000
[webapp]
listen_address=0.0.0.0
port=8001

View File

@ -24,3 +24,5 @@ cryptography>=1.6
cachecontrol
pyjwt
iso8601
aiohttp
uvloop;python_version>='3.5'

View File

@ -26,6 +26,7 @@ console_scripts =
zuul-cloner = zuul.cmd.cloner:main
zuul-executor = zuul.cmd.executor:main
zuul-bwrap = zuul.driver.bubblewrap:main
zuul-web = zuul.cmd.web:main
[build_sphinx]
source-dir = doc/source

View File

@ -1226,7 +1226,6 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self.build_history = []
self.fail_tests = {}
self.job_builds = {}
self.hostname = 'zl.example.com'
def failJob(self, name, change):
"""Instruct the executor to report matching builds as failures.

View File

@ -121,7 +121,8 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('project-merge', buildset0_builds[0]['job_name'])
self.assertEqual("SUCCESS", buildset0_builds[0]['result'])
self.assertEqual(
'finger://zl.example.com/{uuid}'.format(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=buildset0_builds[0]['uuid']),
buildset0_builds[0]['log_url'])
self.assertEqual('check', buildset1['pipeline'])
@ -144,7 +145,8 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('project-test1', buildset1_builds[-2]['job_name'])
self.assertEqual("FAILURE", buildset1_builds[-2]['result'])
self.assertEqual(
'finger://zl.example.com/{uuid}'.format(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=buildset1_builds[-2]['uuid']),
buildset1_builds[-2]['log_url'])

View File

@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import aiohttp
import asyncio
import logging
import json
import os
import os.path
import socket
@ -21,6 +25,7 @@ import tempfile
import threading
import time
import zuul.web
import zuul.lib.log_streamer
import tests.base
@ -57,6 +62,7 @@ class TestLogStreamer(tests.base.BaseTestCase):
class TestStreaming(tests.base.AnsibleZuulTestCase):
tenant_config_file = 'config/streamer/main.yaml'
log = logging.getLogger("zuul.test.test_log_streamer.TestStreaming")
def setUp(self):
super(TestStreaming, self).setUp()
@ -146,9 +152,116 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
# job and deleted. However, we still have a file handle to it, so we
# can make sure that we read the entire contents at this point.
# Compact the returned lines into a single string for easy comparison.
file_contents = ''.join(logfile.readlines())
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data)
self.assertEqual(file_contents, self.streaming_data)
def runWSClient(self, build_uuid, event):
async def client(loop, build_uuid, event):
uri = 'http://127.0.0.1:9000/console-stream'
try:
session = aiohttp.ClientSession(loop=loop)
async with session.ws_connect(uri) as ws:
req = {'uuid': build_uuid, 'logfile': None}
ws.send_str(json.dumps(req))
event.set() # notify we are connected and req sent
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
self.ws_client_results += msg.data
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
session.close()
except Exception as e:
self.log.exception("client exception:")
loop = asyncio.new_event_loop()
loop.set_debug(True)
loop.run_until_complete(client(loop, build_uuid, event))
loop.close()
def test_websocket_streaming(self):
# Need to set the streaming port before submitting the job
finger_port = 7902
self.executor_server.log_streaming_port = finger_port
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
# We don't have any real synchronization for the ansible jobs, so
# just wait until we get our running build.
while not len(self.builds):
time.sleep(0.1)
build = self.builds[0]
self.assertEqual(build.name, 'python27')
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
while not os.path.exists(build_dir):
time.sleep(0.1)
# Need to wait to make sure that jobdir gets set
while build.jobdir is None:
time.sleep(0.1)
build = self.builds[0]
# Wait for the job to begin running and create the ansible log file.
# The job waits to complete until the flag file exists, so we can
# safely access the log here. We only open it (to force a file handle
# to be kept open for it after the job finishes) but wait to read the
# contents until the job is done.
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
while not os.path.exists(ansible_log):
time.sleep(0.1)
logfile = open(ansible_log, 'r')
self.addCleanup(logfile.close)
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(
None, self.host, finger_port, self.executor_server.jobdir_root)
self.addCleanup(streamer.stop)
# Start the web server
web_server = zuul.web.ZuulWeb(
listen_address='127.0.0.1', listen_port=9000,
gear_server='127.0.0.1', gear_port=self.gearman_server.port)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
self.addCleanup(web_server.stop)
# Wait until web server is started
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
while s.connect_ex((self.host, 9000)):
time.sleep(0.1)
# Start a thread with the websocket client
ws_client_event = threading.Event()
self.ws_client_results = ''
ws_client_thread = threading.Thread(
target=self.runWSClient, args=(build.uuid, ws_client_event)
)
ws_client_thread.start()
ws_client_event.wait()
# Allow the job to complete
flag_file = os.path.join(build_dir, 'test_wait')
open(flag_file, 'w').close()
# Wait for the websocket client to complete, which it should when
# it's received the full log.
ws_client_thread.join()
self.waitUntilSettled()
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
self.assertEqual(file_contents, self.ws_client_results)

View File

@ -2289,22 +2289,40 @@ class TestScheduler(ZuulTestCase):
status_jobs.append(job)
self.assertEqual('project-merge', status_jobs[0]['name'])
# TODO(mordred) pull uuids from self.builds
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
status_jobs[0]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[0]['uuid']),
status_jobs[0]['url'])
# TOOD(mordred) configure a success-url on the base job
self.assertEqual('finger://zl.example.com/%s' % status_jobs[0]['uuid'],
status_jobs[0]['report_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[0]['uuid']),
status_jobs[0]['report_url'])
self.assertEqual('project-test1', status_jobs[1]['name'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
status_jobs[1]['url'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[1]['uuid'],
status_jobs[1]['report_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[1]['uuid']),
status_jobs[1]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[1]['uuid']),
status_jobs[1]['report_url'])
self.assertEqual('project-test2', status_jobs[2]['name'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
status_jobs[2]['url'])
self.assertEqual('finger://zl.example.com/%s' % status_jobs[2]['uuid'],
status_jobs[2]['report_url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[2]['uuid']),
status_jobs[2]['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=status_jobs[2]['uuid']),
status_jobs[2]['report_url'])
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
@ -3577,8 +3595,11 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual('project-merge', job['name'])
self.assertEqual('gate', job['pipeline'])
self.assertEqual(False, job['retry'])
self.assertEqual('finger://zl.example.com/%s' % job['uuid'],
job['url'])
self.assertEqual(
'finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=job['uuid']),
job['url'])
self.assertEqual(2, len(job['worker']))
self.assertEqual(False, job['canceled'])
self.assertEqual(True, job['voting'])
@ -4674,7 +4695,8 @@ class TestSchedulerSuccessURL(ZuulTestCase):
# NOTE: This default URL is currently hard-coded in executor/server.py
self.assertIn(
'- docs-draft-test2 finger://zl.example.com/{uuid}'.format(
'- docs-draft-test2 finger://{hostname}/{uuid}'.format(
hostname=self.executor_server.hostname,
uuid=uuid_test2),
body[3])

115
zuul/cmd/web.py Executable file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import asyncio
import daemon
import extras
import logging
import signal
import sys
import threading
import zuul.cmd
import zuul.web
from zuul.lib.config import get_default
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
# instead it depends on lockfile-0.9.1 which uses pidfile.
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
class WebServer(zuul.cmd.ZuulApp):
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Zuul Web Server.')
parser.add_argument('-c', dest='config',
help='specify the config file')
parser.add_argument('-d', dest='nodaemon', action='store_true',
help='do not run as a daemon')
parser.add_argument('--version', dest='version', action='version',
version=self._get_version(),
help='show zuul version')
self.args = parser.parse_args()
def exit_handler(self, signum, frame):
self.web.stop()
def _main(self):
params = dict()
params['listen_address'] = get_default(self.config,
'web', 'listen_address',
'127.0.0.1')
params['listen_port'] = get_default(self.config, 'web', 'port', 9000)
params['gear_server'] = get_default(self.config, 'gearman', 'server')
params['gear_port'] = get_default(self.config, 'gearman', 'port', 4730)
params['ssl_key'] = get_default(self.config, 'gearman', 'ssl_key')
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')
try:
self.web = zuul.web.ZuulWeb(**params)
except Exception as e:
self.log.exception("Error creating ZuulWeb:")
sys.exit(1)
loop = asyncio.get_event_loop()
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)
self.log.info('Zuul Web Server starting')
self.thread = threading.Thread(target=self.web.run,
args=(loop,),
name='web')
self.thread.start()
try:
signal.pause()
except KeyboardInterrupt:
print("Ctrl + C: asking web server to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
self.thread.join()
loop.stop()
loop.close()
self.log.info("Zuul Web Server stopped")
def main(self):
self.setup_logging('web', 'log_config')
self.log = logging.getLogger("zuul.WebServer")
try:
self._main()
except Exception:
self.log.exception("Exception from WebServer:")
def main():
server = WebServer()
server.parse_arguments()
server.read_config()
pid_fn = get_default(server.config, 'web', 'pidfile',
'/var/run/zuul-web/zuul-web.pid', expand_user=True)
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
server.main()
else:
with daemon.DaemonContext(pidfile=pid):
server.main()

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import os.path
import pwd
@ -210,6 +211,8 @@ class LogStreamer(object):
'''
def __init__(self, user, host, port, jobdir_root):
self.log = logging.getLogger('zuul.lib.LogStreamer')
self.log.debug("LogStreamer starting on port %s", port)
self.server = CustomForkingTCPServer((host, port),
RequestHandler,
user=user,
@ -225,3 +228,4 @@ class LogStreamer(object):
if self.thd.isAlive():
self.server.shutdown()
self.server.server_close()
self.log.debug("LogStreamer stopped")

View File

@ -86,3 +86,11 @@ class RPCClient(object):
def shutdown(self):
self.gearman.shutdown()
def get_job_log_stream_address(self, uuid, logfile='console.log'):
data = {'uuid': uuid, 'logfile': logfile}
job = self.submitJob('zuul:get_job_log_stream_address', data)
if job.failure:
return False
else:
return json.loads(job.data[0])

View File

@ -53,6 +53,7 @@ class RPCListener(object):
self.worker.registerFunction("zuul:enqueue_ref")
self.worker.registerFunction("zuul:promote")
self.worker.registerFunction("zuul:get_running_jobs")
self.worker.registerFunction("zuul:get_job_log_stream_address")
def stop(self):
self.log.debug("Stopping")
@ -173,3 +174,29 @@ class RPCListener(object):
running_items.append(item.formatJSON())
job.sendWorkComplete(json.dumps(running_items))
def handle_get_job_log_stream_address(self, job):
# TODO: map log files to ports. Currently there is only one
# log stream for a given job. But many jobs produce many
# log files, so this is forwards compatible with a future
# where there are more logs to potentially request than
# "console.log"
def find_build(uuid):
for tenant in self.sched.abide.tenants.values():
for pipeline_name, pipeline in tenant.layout.pipelines.items():
for queue in pipeline.queues:
for item in queue.queue:
for bld in item.current_build_set.getBuilds():
if bld.uuid == uuid:
return bld
return None
args = json.loads(job.arguments)
uuid = args['uuid']
# TODO: logfile = args['logfile']
job_log_stream_address = {}
build = find_build(uuid)
if build:
job_log_stream_address['server'] = build.worker.hostname
job_log_stream_address['port'] = build.worker.log_port
job.sendWorkComplete(json.dumps(job_log_stream_address))

232
zuul/web.py Normal file
View File

@ -0,0 +1,232 @@
#!/usr/bin/env python
# Copyright (c) 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import json
import logging
import uvloop
import aiohttp
from aiohttp import web
import zuul.rpcclient
class LogStreamingHandler(object):
log = logging.getLogger("zuul.web.LogStreamingHandler")
def __init__(self, loop, gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.event_loop = loop
self.gear_server = gear_server
self.gear_port = gear_port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
def _getPortLocation(self, job_uuid):
'''
Query Gearman for the executor running the given job.
:param str job_uuid: The job UUID we want to stream.
'''
# TODO: Fetch the entire list of uuid/file/server/ports once and
# share that, and fetch a new list on cache misses perhaps?
# TODO: Avoid recreating a client for each request.
rpc = zuul.rpcclient.RPCClient(self.gear_server, self.gear_port,
self.ssl_key, self.ssl_cert,
self.ssl_ca)
ret = rpc.get_job_log_stream_address(job_uuid)
rpc.shutdown()
return ret
async def _fingerClient(self, ws, server, port, job_uuid):
'''
Create a client to connect to the finger streamer and pull results.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param str server: The executor server running the job.
:param str port: The executor server port.
:param str job_uuid: The job UUID to stream.
'''
self.log.debug("Connecting to finger server %s:%s", server, port)
reader, writer = await asyncio.open_connection(host=server, port=port,
loop=self.event_loop)
self.log.debug("Sending finger request for %s", job_uuid)
msg = "%s\n" % job_uuid # Must have a trailing newline!
writer.write(msg.encode('utf8'))
await writer.drain()
while True:
data = await reader.read(1024)
if data:
await ws.send_str(data.decode('utf8'))
else:
writer.close()
return
async def _streamLog(self, ws, request):
'''
Stream the log for the requested job back to the client.
:param aiohttp.web.WebSocketResponse ws: The websocket response object.
:param dict request: The client request parameters.
'''
for key in ('uuid', 'logfile'):
if key not in request:
return (4000, "'{key}' missing from request payload".format(
key=key))
# Schedule the blocking gearman work in an Executor
gear_task = self.event_loop.run_in_executor(
None, self._getPortLocation, request['uuid'])
try:
port_location = await asyncio.wait_for(gear_task, 10)
except asyncio.TimeoutError:
return (4010, "Gearman timeout")
if not port_location:
return (4011, "Error with Gearman")
await self._fingerClient(
ws, port_location['server'], port_location['port'], request['uuid']
)
return (1000, "No more data")
async def processRequest(self, request):
'''
Handle a client websocket request for log streaming.
:param aiohttp.web.Request request: The client request.
'''
try:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
req = json.loads(msg.data)
self.log.debug("Websocket request: %s", req)
code, msg = await self._streamLog(ws, req)
# We expect to process only a single message. I.e., we
# can stream only a single file at a time.
await ws.close(code=code, message=msg)
break
elif msg.type == aiohttp.WSMsgType.ERROR:
self.log.error(
"Websocket connection closed with exception %s",
ws.exception()
)
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
except Exception as e:
self.log.exception("Websocket exception:")
await ws.close(code=4009, message=str(e).encode('utf-8'))
return ws
class ZuulWeb(object):
log = logging.getLogger("zuul.web.ZuulWeb")
def __init__(self, listen_address, listen_port,
gear_server, gear_port,
ssl_key=None, ssl_cert=None, ssl_ca=None):
self.listen_address = listen_address
self.listen_port = listen_port
self.gear_server = gear_server
self.gear_port = gear_port
self.ssl_key = ssl_key
self.ssl_cert = ssl_cert
self.ssl_ca = ssl_ca
async def _handleWebsocket(self, request):
handler = LogStreamingHandler(self.event_loop,
self.gear_server, self.gear_port,
self.ssl_key, self.ssl_cert, self.ssl_ca)
return await handler.processRequest(request)
def run(self, loop=None):
'''
Run the websocket daemon.
Because this method can be the target of a new thread, we need to
set the thread event loop here, rather than in __init__().
:param loop: The event loop to use. If not supplied, the default main
thread event loop is used. This should be supplied if ZuulWeb
is run within a separate (non-main) thread.
'''
routes = [
('GET', '/console-stream', self._handleWebsocket)
]
self.log.debug("ZuulWeb starting")
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
user_supplied_loop = loop is not None
if not loop:
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
self.event_loop = loop
app = web.Application()
for method, path, handler in routes:
app.router.add_route(method, path, handler)
handler = app.make_handler(loop=self.event_loop)
# create the server
coro = self.event_loop.create_server(handler,
self.listen_address,
self.listen_port)
self.server = self.event_loop.run_until_complete(coro)
self.term = asyncio.Future()
# start the server
self.event_loop.run_until_complete(self.term)
# cleanup
self.log.debug("ZuulWeb stopping")
self.server.close()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.run_until_complete(app.shutdown())
self.event_loop.run_until_complete(handler.shutdown(60.0))
self.event_loop.run_until_complete(app.cleanup())
self.log.debug("ZuulWeb stopped")
# Only run these if we are controlling the loop - they need to be
# run from the main thread
if not user_supplied_loop:
loop.stop()
loop.close()
def stop(self):
self.event_loop.call_soon_threadsafe(self.term.set_result, True)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
z = ZuulWeb()
z.run(loop)