Merge branch 'master' into v3_merge

Includes minor py3 fixes (for pep8 on py3).

 Conflicts:
	tests/base.py
	tests/test_model.py
	tests/test_scheduler.py
	tox.ini
	zuul/model.py
	zuul/reporter/__init__.py
	zuul/scheduler.py
	zuul/source/gerrit.py

Change-Id: I99daf9acd746767967b42396881a2dff82134a07
This commit is contained in:
Joshua Hesketh 2016-07-14 00:12:25 +10:00
commit 0aa7e8bdbf
60 changed files with 3217 additions and 206 deletions

View File

@ -1,4 +1,4 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} OS_LOG_DEFAULTS=${OS_LOG_DEFAULTS:-""} ${PYTHON:-python} -m subunit.run discover -t ./ tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -13,6 +13,7 @@ Contents:
.. toctree::
:maxdepth: 2
quick-start
gating
connections
triggers

View File

@ -6,7 +6,7 @@
https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin
.. _`Turbo-Hipster`:
http://git.openstack.org/cgit/stackforge/turbo-hipster/
https://git.openstack.org/cgit/openstack/turbo-hipster/
.. _`Turbo-Hipster Documentation`:
http://turbo-hipster.rtfd.org/

View File

@ -58,3 +58,17 @@ instance, a clone will produce a repository in an unpredictable state
depending on what the state of Zuul's repository is when the clone
happens). They are, however, suitable for automated systems that
respond to Zuul triggers.
Clearing old references
~~~~~~~~~~~~~~~~~~~~~~~
The references created under refs/zuul are not garbage collected. Since
git fetch send them all to Gerrit to sync the repositories, the time
spent on merge will slightly grow overtime and start being noticeable.
To clean them you can use the ``tools/zuul-clear-refs.py`` script on
each repositories. It will delete Zuul references that point to commits
for which the commit date is older than a given amount of days (default
360)::
./tools/zuul-clear-refs.py /path/to/zuul/git/repo

162
doc/source/quick-start.rst Normal file
View File

@ -0,0 +1,162 @@
Quick Start Guide
=================
System Requirements
-------------------
For most deployments zuul only needs 1-2GB. OpenStack uses a 30GB setup.
Install Zuul
------------
You can get zuul from pypi via::
pip install zuul
Zuul Components
---------------
Zuul provides the following components:
- **zuul-server**: scheduler daemon which communicates with Gerrit and
Gearman. Handles receiving events, launching jobs, collecting results
and postingreports.
- **zuul-merger**: speculative-merger which communicates with Gearman.
Prepares Git repositories for jobs to test against. This additionally
requires a web server hosting the Git repositories which can be cloned
by the jobs.
- **zuul-cloner**: client side script used to setup job workspace. It is
used to clone the repositories prepared by the zuul-merger described
previously.
- **gearmand**: optional builtin gearman daemon provided by zuul-server
External components:
- Jenkins Gearman plugin: Used by Jenkins to connect to Gearman
Zuul Communication
------------------
All the Zuul components communicate with each other using Gearman. As well as
the following communication channels:
zuul-server:
- Gerrit
- Gearman Daemon
zuul-merger:
- Gerrit
- Gearman Daemon
zuul-cloner:
- http hosted zuul-merger git repos
Jenkins:
- Gearman Daemon via Jenkins Gearman Plugin
Zuul Setup
----------
At minimum we need to provide **zuul.conf** and **layout.yaml** and placed
in /etc/zuul/ directory. You will also need a zuul user and ssh key for the
zuul user in Gerrit. The following example uses the builtin gearmand service
in zuul.
**zuul.conf**::
[zuul]
layout_config=/etc/zuul/layout.yaml
[merger]
git_dir=/git
zuul_url=http://zuul.example.com/p
[gearman_server]
start=true
[gearman]
server=127.0.0.1
[connection gerrit]
driver=gerrit
server=git.example.com
port=29418
baseurl=https://git.example.com/gerrit/
user=zuul
sshkey=/home/zuul/.ssh/id_rsa
See :doc:`zuul` for more details.
The following sets up a basic timer triggered job using zuul.
**layout.yaml**::
pipelines:
- name: periodic
source: gerrit
manager: IndependentPipelineManager
trigger:
timer:
- time: '0 * * * *'
projects:
- name: aproject
periodic:
- aproject-periodic-build
Starting Zuul
-------------
You can run zuul-server with the **-d** option to make it not daemonize. It's
a good idea at first to confirm there's no issues with your configuration.
Simply run::
zuul-server
Once run you should have 2 zuul-server processes::
zuul 12102 1 0 Jan21 ? 00:15:45 /home/zuul/zuulvenv/bin/python /home/zuul/zuulvenv/bin/zuul-server -d
zuul 12107 12102 0 Jan21 ? 00:00:01 /home/zuul/zuulvenv/bin/python /home/zuul/zuulvenv/bin/zuul-server -d
Note: In this example zuul was installed in a virtualenv.
The 2nd zuul-server process is gearmand running if you are using the builtin
gearmand server, otherwise there will only be 1 process.
Zuul won't actually process your Job queue however unless you also have a
zuul-merger process running.
Simply run::
zuul-merger
Zuul should now be able to process your periodic job as configured above once
the Jenkins side of things is configured.
Jenkins Setup
-------------
Install the Jenkins Gearman Plugin via Jenkins Plugin management interface.
Then naviage to **Manage > Configuration > Gearman** and setup the Jenkins
server hostname/ip and port to connect to gearman.
At this point gearman should be running your Jenkins jobs.
Troubleshooting
---------------
Checking Gearman function registration (jobs). You can use telnet to connect
to gearman to check that Jenkins is registering your configured jobs in
gearman::
telnet <gearman_ip> 4730
Useful commands are **workers** and **status** which you can run by just
typing those commands once connected to gearman. Every job in your Jenkins
master must appear when you run **workers** for Zuul to be able to run jobs
against your Jenkins instance.

View File

@ -31,7 +31,7 @@ Metrics
The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
**gerrit.events.<type> (counters)**
**gerrit.event.<type> (counters)**
Gerrit emits different kind of message over its `stream-events` interface. As
a convenience, Zuul emits metrics to statsd which save you from having to use
a different daemon to measure Gerrit events.
@ -52,6 +52,18 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
Refer to your Gerrit installation documentation for an exhaustive list of
Gerrit event types.
**zuul.node_type.**
Holds metrics specifc to build nodes per label. The hierarchy is:
#. **<build node label>** each of the labels associated to a build in
Jenkins. It contains:
#. **job.<jobname>** subtree detailing per job statistics:
#. **wait_time** counter and timer of the wait time, with the
difference of the job start time and the launch time, in
milliseconds.
**zuul.pipeline.**
Holds metrics specific to jobs. The hierarchy is:
@ -75,10 +87,13 @@ The metrics are emitted by the Zuul scheduler (`zuul/scheduler.py`):
known by Zuul (which includes build time and Zuul overhead).
#. **total_changes** counter of the number of change proceeding since
Zuul started.
#. **wait_time** counter and timer of the wait time, with the difference
of the job start time and the launch time, in milliseconds.
Additionally, the `zuul.pipeline.<pipeline name>` hierarchy contains
`current_changes` and `resident_time` metrics for each projects. The slash
separator used in Gerrit name being replaced by dots.
`current_changes` (gauge), `resident_time` (timing) and `total_changes`
(counter) metrics for each projects. The slash separator used in Gerrit name
being replaced by dots.
As an example, given a job named `myjob` triggered by the `gate` pipeline
which took 40 seconds to build, the Zuul scheduler will emit the following

View File

@ -10,11 +10,11 @@ Zuul has three configuration files:
**zuul.conf**
Connection information for Gerrit and Gearman, locations of the
other config files.
other config files. (required)
**layout.yaml**
Project and pipeline configuration -- what Zuul does.
Project and pipeline configuration -- what Zuul does. (required)
**logging.conf**
Python logging config.
Python logging config. (optional)
Examples of each of the three files can be found in the etc/ directory
of the source distribution.
@ -41,17 +41,28 @@ You can also find an example zuul.conf file in the git
gearman
"""""""
Client connection information for gearman. If using Zuul's builtin gearmand
server just set **server** to 127.0.0.1.
**server**
Hostname or IP address of the Gearman server.
``server=gearman.example.com``
``server=gearman.example.com`` (required)
**port**
Port on which the Gearman server is listening.
``port=4730``
``port=4730`` (optional)
**check_job_registration**
Check to see if job is registered with Gearman or not. When True
a build result of NOT_REGISTERED will be return if job is not found.
``check_job_registration=True``
gearman_server
""""""""""""""
The builtin gearman server. Zuul can fork a gearman process from itself rather
than connecting to an external one.
**start**
Whether to start the internal Gearman server (default: False).
``start=true``
@ -64,9 +75,25 @@ gearman_server
Path to log config file for internal Gearman server.
``log_config=/etc/zuul/gearman-logging.yaml``
webapp
""""""
**listen_address**
IP address or domain name on which to listen (default: 0.0.0.0).
``listen_address=127.0.0.1``
**port**
Port on which the webapp is listening (default: 8001).
``port=8008``
zuul
""""
Zuul's main configuration section. At minimum zuul must be able to find
layout.yaml to be useful.
.. note:: Must be provided when running zuul-server
.. _layout_config:
**layout_config**
@ -118,6 +145,13 @@ zuul
merger
""""""
The zuul-merger process configuration. Detailed documentation on this process
can be found on the :doc:`merger` page.
.. note:: Must be provided when running zuul-merger. Both services may share the
same configuration (and even host) or otherwise have an individual
zuul.conf.
**git_dir**
Directory that Zuul should clone local git repositories to.
``git_dir=/var/lib/zuul/git``
@ -394,11 +428,12 @@ explanation of each of the parameters::
approval matching all specified requirements.
*username*
If present, an approval from this username is required.
If present, an approval from this username is required. It is
treated as a regular expression.
*email*
If present, an approval with this email address is required. It
is treated as a regular expression as above.
is treated as a regular expression.
*email-filter* (deprecated)
A deprecated alternate spelling of *email*. Only one of *email* or
@ -759,7 +794,10 @@ each job as it builds a list from the project specification.
expressions.
The pattern for '/COMMIT_MSG' is always matched on and does not
have to be included.
have to be included. Exception is merge commits (without modified
files), in this case '/COMMIT_MSG' is not matched, and job is not
skipped. In case of merge commits it's assumed that list of modified
files isn't predictible and CI should be run.
**voting (optional)**
Boolean value (``true`` or ``false``) that indicates whatever
@ -997,9 +1035,8 @@ normal operation, omit ``-d`` and let Zuul run as a daemon.
If you send signal 1 (SIGHUP) to the zuul-server process, Zuul will
stop executing new jobs, wait until all executing jobs are finished,
reload its configuration, and resume. Any values in any of the
configuration files may be changed, except the location of Zuul's PID
file (a change to that will be ignored until Zuul is restarted).
reload its layout.yaml, and resume. Changes to any connections or
the PID file will be ignored until Zuul is restarted.
If you send a SIGUSR1 to the zuul-server process, Zuul will stop
executing new jobs, wait until all executing jobs are finished,

View File

@ -490,10 +490,12 @@
$header_div.append($heading);
if (typeof pipeline.description === 'string') {
var descr = $('<small />')
$.each( pipeline.description.split(/\r?\n\r?\n/), function(index, descr_part){
descr.append($('<p />').text(descr_part));
});
$header_div.append(
$('<p />').append(
$('<small />').text(pipeline.description)
)
$('<p />').append(descr)
);
}
return $header_div;

View File

@ -26,6 +26,10 @@ default_container=logs
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/
[webapp]
listen_address=0.0.0.0
port=8001
[connection gerrit]
driver=gerrit
server=review.example.com

4
other-requirements.txt Normal file
View File

@ -0,0 +1,4 @@
mysql-client [test]
mysql-server [test]
postgresql [test]
postgresql-client [test]

View File

@ -3,7 +3,7 @@ pbr>=1.1.0
PyYAML>=3.1.0
Paste
WebOb>=1.2.3
paramiko>=1.8.0
paramiko>=1.8.0,<2.0.0
GitPython>=0.3.3
ordereddict
python-daemon>=2.0.4,<2.1.0

View File

@ -25,6 +25,7 @@ console_scripts =
zuul-merger = zuul.cmd.merger:main
zuul = zuul.cmd.client:main
zuul-cloner = zuul.cmd.cloner:main
zuul-launcher = zuul.cmd.launcher:main
[build_sphinx]
source-dir = doc/source

View File

@ -22,10 +22,12 @@ import logging
import os
import pprint
from six.moves import queue as Queue
from six.moves import urllib
import random
import re
import select
import shutil
from six.moves import reload_module
import socket
import string
import subprocess
@ -33,12 +35,10 @@ import swiftclient
import tempfile
import threading
import time
import urllib2
import git
import gear
import fixtures
import six.moves.urllib.parse as urlparse
import statsd
import testtools
from git import GitCommandError
@ -482,7 +482,7 @@ class FakeURLOpener(object):
self.url = url
def read(self):
res = urlparse.urlparse(self.url)
res = urllib.parse.urlparse(self.url)
path = res.path
project = '/'.join(path.split('/')[2:-2])
ret = '001e# service=git-upload-pack\n'
@ -882,6 +882,28 @@ class BaseTestCase(testtools.TestCase):
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
# NOTE(notmorgan): Extract logging overrides for specific libraries
# from the OS_LOG_DEFAULTS env and create FakeLogger fixtures for
# each. This is used to limit the output during test runs from
# libraries that zuul depends on such as gear.
log_defaults_from_env = os.environ.get('OS_LOG_DEFAULTS')
if log_defaults_from_env:
for default in log_defaults_from_env.split(','):
try:
name, level_str = default.split('=', 1)
level = getattr(logging, level_str, logging.DEBUG)
self.useFixture(fixtures.FakeLogger(
name=name,
level=level,
format='%(asctime)s %(name)-32s '
'%(levelname)-8s %(message)s'))
except ValueError:
# NOTE(notmorgan): Invalid format of the log default,
# skip and don't try and apply a logger for the
# specified module
pass
class ZuulTestCase(BaseTestCase):
config_file = 'zuul.conf'
@ -897,11 +919,13 @@ class ZuulTestCase(BaseTestCase):
self.test_root = os.path.join(tmp_root, "zuul-test")
self.upstream_root = os.path.join(self.test_root, "upstream")
self.git_root = os.path.join(self.test_root, "git")
self.state_root = os.path.join(self.test_root, "lib")
if os.path.exists(self.test_root):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
os.makedirs(self.upstream_root)
os.makedirs(self.state_root)
# Make per test copy of Configuration.
self.setup_config()
@ -909,6 +933,7 @@ class ZuulTestCase(BaseTestCase):
os.path.join(FIXTURE_DIR,
self.config.get('zuul', 'tenant_config')))
self.config.set('merger', 'git_dir', self.git_root)
self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
self.init_repo("org/project")
@ -937,8 +962,8 @@ class ZuulTestCase(BaseTestCase):
os.environ['STATSD_PORT'] = str(self.statsd.port)
self.statsd.start()
# the statsd client object is configured in the statsd module import
reload(statsd)
reload(zuul.scheduler)
reload_module(statsd)
reload_module(zuul.scheduler)
self.gearman_server = FakeGearmanServer()
@ -967,12 +992,12 @@ class ZuulTestCase(BaseTestCase):
self.ansible_server.start()
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib2.Request):
if isinstance(args[0], urllib.request.Request):
return old_urlopen(*args, **kw)
return FakeURLOpener(self.upstream_root, *args, **kw)
old_urlopen = urllib2.urlopen
urllib2.urlopen = URLOpenerFactory
old_urlopen = urllib.request.urlopen
urllib.request.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.client.LaunchClient(
self.config, self.sched, self.swift)
@ -982,7 +1007,8 @@ class ZuulTestCase(BaseTestCase):
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
self.webapp = zuul.webapp.WebApp(
self.sched, port=0, listen_address='127.0.0.1')
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.sched.start()
@ -1179,6 +1205,17 @@ class ZuulTestCase(BaseTestCase):
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
def create_commit(self, project):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
repo.head.reference = repo.heads['master']
file_name = os.path.join(path, 'README')
with open(file_name, 'a') as f:
f.write('creating fake commit\n')
repo.index.add([file_name])
commit = repo.index.commit('Creating a fake commit')
return commit.hexsha
def ref_has_change(self, ref, change):
path = os.path.join(self.git_root, change.project)
repo = git.Repo(path)
@ -1325,9 +1362,11 @@ class ZuulTestCase(BaseTestCase):
start = time.time()
while True:
if time.time() - start > 10:
print 'queue status:',
print ' '.join(self.eventQueuesEmpty())
print self.areAllBuildsWaiting()
self.log.debug("Queue status:")
for queue in self.event_queues:
self.log.debug(" %s: %s" % (queue, queue.empty()))
self.log.debug("All builds waiting: %s" %
(self.areAllBuildsWaiting(),))
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
# have all build states propogated to zuul?
@ -1369,8 +1408,8 @@ class ZuulTestCase(BaseTestCase):
for pipeline in tenant.layout.pipelines.values():
for queue in pipeline.queues:
if len(queue.queue) != 0:
print 'pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue)
print('pipeline %s queue %s contents %s' % (
pipeline.name, queue.name, queue.queue))
self.assertEqual(len(queue.queue), 0,
"Pipelines queues should be empty")

View File

@ -3,7 +3,7 @@ pipelines:
manager: IndependentPipelineManager
require:
approval:
- username: jenkins
- username: ^(jenkins|zuul)$
trigger:
gerrit:
- event: comment-added

View File

@ -0,0 +1,21 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
smtp:
to: me@example.org
jobs:
- name: docs-draft-test
success-pattern: http://docs-draft.example.org/{build.parameters[LOG_PATH]}/publish-docs/
- name: docs-draft-test2
success-pattern: http://docs-draft.example.org/{NOPE}/{build.parameters[BAD]}/publish-docs/
projects:
- name: org/docs
check:
- docs-draft-test:
- docs-draft-test2

View File

@ -123,13 +123,13 @@ class TestMatchAllFiles(BaseTestMatcher):
self._test_matches(False)
def test_matches_returns_false_when_not_all_files_match(self):
self._test_matches(False, files=['docs/foo', 'foo/bar'])
self._test_matches(False, files=['/COMMIT_MSG', 'docs/foo', 'foo/bar'])
def test_matches_returns_true_when_commit_message_matches(self):
self._test_matches(True, files=['/COMMIT_MSG'])
def test_matches_returns_false_when_commit_message_matches(self):
self._test_matches(False, files=['/COMMIT_MSG'])
def test_matches_returns_true_when_all_files_match(self):
self._test_matches(True, files=['docs/foo'])
self._test_matches(True, files=['/COMMIT_MSG', 'docs/foo'])
class TestMatchAll(BaseTestMatcher):

View File

@ -568,3 +568,57 @@ class TestCloner(ZuulTestCase):
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
def test_post_checkout(self):
project = "org/project"
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
repo.head.reference = repo.heads['master']
commits = []
for i in range(0, 3):
commits.append(self.create_commit(project))
newRev = commits[1]
cloner = zuul.lib.cloner.Cloner(
git_base_url=self.upstream_root,
projects=[project],
workspace=self.workspace_root,
zuul_branch=None,
zuul_ref='master',
zuul_url=self.git_root,
zuul_project=project,
zuul_newrev=newRev,
)
cloner.execute()
repos = self.getWorkspaceRepos([project])
cloned_sha = repos[project].rev_parse('HEAD').hexsha
self.assertEqual(newRev, cloned_sha)
def test_post_and_master_checkout(self):
project = "org/project1"
master_project = "org/project2"
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
repo.head.reference = repo.heads['master']
commits = []
for i in range(0, 3):
commits.append(self.create_commit(project))
newRev = commits[1]
cloner = zuul.lib.cloner.Cloner(
git_base_url=self.upstream_root,
projects=[project, master_project],
workspace=self.workspace_root,
zuul_branch=None,
zuul_ref='master',
zuul_url=self.git_root,
zuul_project=project,
zuul_newrev=newRev
)
cloner.execute()
repos = self.getWorkspaceRepos([project, master_project])
cloned_sha = repos[project].rev_parse('HEAD').hexsha
self.assertEqual(newRev, cloned_sha)
self.assertEqual(
repos[master_project].rev_parse('HEAD').hexsha,
repos[master_project].rev_parse('master').hexsha)

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
from six.moves import configparser as ConfigParser
import os
import re
@ -36,13 +36,13 @@ class TestLayoutValidator(testtools.TestCase):
def test_layouts(self):
"""Test layout file validation"""
print
print()
errors = []
for fn in os.listdir(os.path.join(FIXTURE_DIR, 'layouts')):
m = LAYOUT_RE.match(fn)
if not m:
continue
print fn
print(fn)
# Load any .conf file by the same name but .conf extension.
config_file = ("%s.conf" %
@ -72,7 +72,7 @@ class TestLayoutValidator(testtools.TestCase):
fn)
except voluptuous.Invalid as e:
error = str(e)
print ' ', error
print(' ', error)
if error in errors:
raise Exception("Error has already been tested: %s" %
error)

View File

@ -12,6 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import fixtures
from zuul import model
from zuul import configloader
@ -32,12 +38,12 @@ class TestJob(BaseTestCase):
def test_change_matches_returns_false_for_matched_skip_if(self):
change = model.Change('project')
change.files = ['docs/foo']
change.files = ['/COMMIT_MSG', 'docs/foo']
self.assertFalse(self.job.changeMatches(change))
def test_change_matches_returns_true_for_unmatched_skip_if(self):
change = model.Change('project')
change.files = ['foo']
change.files = ['/COMMIT_MSG', 'foo']
self.assertTrue(self.job.changeMatches(change))
def test_job_sets_defaults_for_boolean_attributes(self):
@ -98,3 +104,76 @@ class TestJob(BaseTestCase):
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 50)
class TestJobTimeData(BaseTestCase):
def setUp(self):
super(TestJobTimeData, self).setUp()
self.tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
def test_empty_timedata(self):
path = os.path.join(self.tmp_root, 'job-name')
self.assertFalse(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
def test_save_reload(self):
path = os.path.join(self.tmp_root, 'job-name')
self.assertFalse(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
self.assertEqual(td.success_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.failure_times, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.assertEqual(td.results, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
success_times = []
failure_times = []
results = []
for x in range(10):
success_times.append(int(random.random() * 1000))
failure_times.append(int(random.random() * 1000))
results.append(0)
results.append(1)
random.shuffle(results)
s = f = 0
for result in results:
if result:
td.add(failure_times[f], 'FAILURE')
f += 1
else:
td.add(success_times[s], 'SUCCESS')
s += 1
self.assertEqual(td.success_times, success_times)
self.assertEqual(td.failure_times, failure_times)
self.assertEqual(td.results, results[10:])
td.save()
self.assertTrue(os.path.exists(path))
self.assertFalse(os.path.exists(path + '.tmp'))
td = model.JobTimeData(path)
td.load()
self.assertEqual(td.success_times, success_times)
self.assertEqual(td.failure_times, failure_times)
self.assertEqual(td.results, results[10:])
class TestTimeDataBase(BaseTestCase):
def setUp(self):
super(TestTimeDataBase, self).setUp()
self.tmp_root = self.useFixture(fixtures.TempDir(
rootdir=os.environ.get("ZUUL_TEST_ROOT"))
).path
self.db = model.TimeDataBase(self.tmp_root)
def test_timedatabase(self):
self.assertEqual(self.db.getEstimatedTime('job-name'), 0)
self.db.update('job-name', 50, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime('job-name'), 50)
self.db.update('job-name', 100, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime('job-name'), 75)
for x in range(10):
self.db.update('job-name', 100, 'SUCCESS')
self.assertEqual(self.db.getEstimatedTime('job-name'), 100)

View File

@ -20,11 +20,10 @@ import os
import re
import shutil
import time
import urllib
import urllib2
import yaml
import git
from six.moves import urllib
import testtools
import zuul.change_matcher
@ -501,6 +500,46 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
def _test_time_database(self, iteration):
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
time.sleep(2)
data = json.loads(self.sched.formatStatusJSON())
found_job = None
for pipeline in data['pipelines']:
if pipeline['name'] != 'gate':
continue
for queue in pipeline['change_queues']:
for head in queue['heads']:
for item in head:
for job in item['jobs']:
if job['name'] == 'project-merge':
found_job = job
break
self.assertIsNotNone(found_job)
if iteration == 1:
self.assertIsNotNone(found_job['estimated_time'])
self.assertIsNone(found_job['remaining_time'])
else:
self.assertIsNotNone(found_job['estimated_time'])
self.assertTrue(found_job['estimated_time'] >= 2)
self.assertIsNotNone(found_job['remaining_time'])
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
def test_time_database(self):
"Test the time database"
self._test_time_database(1)
self._test_time_database(2)
def test_two_failed_changes_at_head(self):
"Test that changes are reparented correctly if 2 fail at head"
@ -606,6 +645,36 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)
def test_parse_skip_if(self):
job_yaml = """
jobs:
- name: job_name
skip-if:
- project: ^project_name$
branch: ^stable/icehouse$
all-files-match-any:
- ^filename$
- project: ^project2_name$
all-files-match-any:
- ^filename2$
""".strip()
data = yaml.load(job_yaml)
config_job = data.get('jobs')[0]
cm = zuul.change_matcher
expected = cm.MatchAny([
cm.MatchAll([
cm.ProjectMatcher('^project_name$'),
cm.BranchMatcher('^stable/icehouse$'),
cm.MatchAllFiles([cm.FileMatcher('^filename$')]),
]),
cm.MatchAll([
cm.ProjectMatcher('^project2_name$'),
cm.MatchAllFiles([cm.FileMatcher('^filename2$')]),
]),
])
matcher = self.sched._parseSkipIf(config_job)
self.assertEqual(expected, matcher)
def test_patch_order(self):
"Test that dependent patches are tested in the right order"
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -1455,7 +1524,7 @@ class TestScheduler(ZuulTestCase):
self.worker.build_history = []
path = os.path.join(self.git_root, "org/project")
print repack_repo(path)
print(repack_repo(path))
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
@ -1480,9 +1549,9 @@ class TestScheduler(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addPatchset(large=True)
path = os.path.join(self.upstream_root, "org/project1")
print repack_repo(path)
print(repack_repo(path))
path = os.path.join(self.git_root, "org/project1")
print repack_repo(path)
print(repack_repo(path))
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@ -2241,15 +2310,18 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.worker.release('project-merge')
self.waitUntilSettled()
port = self.webapp.server.socket.getsockname()[1]
req = urllib2.Request("http://localhost:%s/status.json" % port)
f = urllib2.urlopen(req)
req = urllib.request.Request("http://localhost:%s/status.json" % port)
f = urllib.request.urlopen(req)
headers = f.info()
self.assertIn('Content-Length', headers)
self.assertIn('Content-Type', headers)
self.assertEqual(headers['Content-Type'],
'application/json; charset=UTF-8')
self.assertIsNotNone(re.match('^application/json(; charset=UTF-8)?$',
headers['Content-Type']))
self.assertIn('Access-Control-Allow-Origin', headers)
self.assertIn('Cache-Control', headers)
self.assertIn('Last-Modified', headers)
@ -2261,7 +2333,7 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
data = json.loads(data)
status_jobs = set()
status_jobs = []
for p in data['pipelines']:
for q in p['change_queues']:
if p['name'] in ['gate', 'conflict']:
@ -2273,10 +2345,24 @@ class TestScheduler(ZuulTestCase):
self.assertTrue(change['active'])
self.assertEqual(change['id'], '1,1')
for job in change['jobs']:
status_jobs.add(job['name'])
self.assertIn('project-merge', status_jobs)
self.assertIn('project-test1', status_jobs)
self.assertIn('project-test2', status_jobs)
status_jobs.append(job)
self.assertEqual('project-merge', status_jobs[0]['name'])
self.assertEqual('https://server/job/project-merge/0/',
status_jobs[0]['url'])
self.assertEqual('http://logs.example.com/1/1/gate/project-merge/0',
status_jobs[0]['report_url'])
self.assertEqual('project-test1', status_jobs[1]['name'])
self.assertEqual('https://server/job/project-test1/1/',
status_jobs[1]['url'])
self.assertEqual('http://logs.example.com/1/1/gate/project-test1/1',
status_jobs[1]['report_url'])
self.assertEqual('project-test2', status_jobs[2]['name'])
self.assertEqual('https://server/job/project-test2/2/',
status_jobs[2]['url'])
self.assertEqual('http://logs.example.com/1/1/gate/project-test2/2',
status_jobs[2]['report_url'])
def test_merging_queues(self):
"Test that transitively-connected change queues are merged"
@ -2829,7 +2915,8 @@ class TestScheduler(ZuulTestCase):
port = self.webapp.server.socket.getsockname()[1]
f = urllib.urlopen("http://localhost:%s/status.json" % port)
req = urllib.request.Request("http://localhost:%s/status.json" % port)
f = urllib.request.urlopen(req)
data = f.read()
self.worker.hold_jobs_in_build = False
@ -4215,6 +4302,45 @@ For CI problems and help debugging, contact ci@example.org"""
self.waitUntilSettled()
self.assertEqual(self.history[-1].changes, '3,2 2,1 1,2')
def test_crd_cycle_join(self):
"Test an updated change creates a cycle"
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Create B->A
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
B.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
B.subject, A.data['id'])
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Update A to add A->B (a cycle).
A.addPatchset()
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.data['id'])
# Normally we would submit the patchset-created event for
# processing here, however, we have no way of noting whether
# the dependency cycle detection correctly raised an
# exception, so instead, we reach into the source driver and
# call the method that would ultimately be called by the event
# processing.
source = self.sched.layout.pipelines['gate'].source
with testtools.ExpectedException(
Exception, "Dependency cycle detected"):
source._getChange(u'1', u'2', True)
self.log.debug("Got expected dependency cycle exception")
# Now if we update B to remove the depends-on, everything
# should be okay. B; A->B
B.addPatchset()
B.data['commitMessage'] = '%s\n' % (B.subject,)
source._getChange(u'1', u'2', True)
source._getChange(u'2', u'2', True)
def test_disable_at(self):
"Test a pipeline will only report to the disabled trigger when failing"
@ -4336,3 +4462,38 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertIn('Build failed.', K.messages[0])
# No more messages reported via smtp
self.assertEqual(3, len(self.smtp_messages))
def test_success_pattern(self):
"Ensure bad build params are ignored"
# Use SMTP reporter to grab the result message easier
self.init_repo("org/docs")
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-success-pattern.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
self.registerJobs()
A = self.fake_gerrit.addFakeChange('org/docs', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Grab build id
self.assertEqual(len(self.builds), 1)
uuid = self.builds[0].unique[:7]
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(len(self.smtp_messages), 1)
body = self.smtp_messages[0]['body'].splitlines()
self.assertEqual('Build succeeded.', body[0])
self.assertIn(
'- docs-draft-test http://docs-draft.example.org/1/1/1/check/'
'docs-draft-test/%s/publish-docs/' % uuid,
body[2])
self.assertIn(
'- docs-draft-test2 https://server/job/docs-draft-test2/1/',
body[3])

View File

@ -16,7 +16,8 @@
# under the License.
import json
import urllib2
from six.moves import urllib
from tests.base import ZuulTestCase
@ -46,41 +47,41 @@ class TestWebapp(ZuulTestCase):
def test_webapp_status(self):
"Test that we can filter to only certain changes in the webapp."
req = urllib2.Request(
req = urllib.request.Request(
"http://localhost:%s/status" % self.port)
f = urllib2.urlopen(req)
f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertIn('pipelines', data)
def test_webapp_status_compat(self):
# testing compat with status.json
req = urllib2.Request(
req = urllib.request.Request(
"http://localhost:%s/status.json" % self.port)
f = urllib2.urlopen(req)
f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertIn('pipelines', data)
def test_webapp_bad_url(self):
# do we 404 correctly
req = urllib2.Request(
req = urllib.request.Request(
"http://localhost:%s/status/foo" % self.port)
self.assertRaises(urllib2.HTTPError, urllib2.urlopen, req)
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, req)
def test_webapp_find_change(self):
# can we filter by change id
req = urllib2.Request(
req = urllib.request.Request(
"http://localhost:%s/status/change/1,1" % self.port)
f = urllib2.urlopen(req)
f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertEqual(1, len(data), data)
self.assertEqual("org/project", data[0]['project'])
req = urllib2.Request(
req = urllib.request.Request(
"http://localhost:%s/status/change/2,1" % self.port)
f = urllib2.urlopen(req)
f = urllib.request.urlopen(req)
data = json.loads(f.read())
self.assertEqual(1, len(data), data)

View File

@ -68,7 +68,7 @@ def main():
job = gear.Job("build:%s" % args.job,
json.dumps(data),
unique=data['ZUUL_UUID'])
c.submitJob(job)
c.submitJob(job, precedence=gear.PRECEDENCE_HIGH)
while not job.complete:
time.sleep(1)

View File

@ -35,7 +35,7 @@ for pipeline in data['pipelines']:
if not change['live']:
continue
cid, cps = change['id'].split(',')
print (
print(
"zuul enqueue --trigger gerrit --pipeline %s "
"--project %s --change %s,%s" % (
options.pipeline_name,

94
tools/zuul-clear-refs.py Executable file
View File

@ -0,0 +1,94 @@
#!/usr/bin/env python
# Copyright 2014-2015 Antoine "hashar" Musso
# Copyright 2014-2015 Wikimedia Foundation Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# pylint: disable=locally-disabled, invalid-name
"""
Zuul references cleaner.
Clear up references under /refs/zuul/ by inspecting the age of the commit the
reference points to. If the commit date is older than a number of days
specificed by --until, the reference is deleted from the git repository.
Use --dry-run --verbose to finely inspect the script behavior.
"""
import argparse
import git
import logging
import time
import sys
NOW = int(time.time())
DEFAULT_DAYS = 360
ZUUL_REF_PREFIX = 'refs/zuul/'
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument('--until', dest='days_ago', default=DEFAULT_DAYS, type=int,
help='references older than this number of day will '
'be deleted. Default: %s' % DEFAULT_DAYS)
parser.add_argument('-n', '--dry-run', dest='dryrun', action='store_true',
help='do not delete references')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true',
help='set log level from info to debug')
parser.add_argument('gitrepo', help='path to a Zuul git repository')
args = parser.parse_args()
logging.basicConfig()
log = logging.getLogger('zuul-clear-refs')
if args.verbose:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
try:
repo = git.Repo(args.gitrepo)
except git.exc.InvalidGitRepositoryError:
log.error("Invalid git repo: %s" % args.gitrepo)
sys.exit(1)
for ref in repo.references:
if not ref.path.startswith(ZUUL_REF_PREFIX):
continue
if type(ref) is not git.refs.reference.Reference:
# Paranoia: ignore heads/tags/remotes ..
continue
try:
commit_ts = ref.commit.committed_date
except LookupError:
# GitPython does not properly handle PGP signed tags
log.exception("Error in commit: %s, ref: %s. Type: %s",
ref.commit, ref.path, type(ref))
continue
commit_age = int((NOW - commit_ts) / 86400) # days
log.debug(
"%s at %s is %3s days old",
ref.commit,
ref.path,
commit_age,
)
if commit_age > args.days_ago:
if args.dryrun:
log.info("Would delete old ref: %s (%s)", ref.path, ref.commit)
else:
log.info("Deleting old ref: %s (%s)", ref.path, ref.commit)
ref.delete(repo, ref.path)

11
tox.ini
View File

@ -9,6 +9,7 @@ setenv = STATSD_HOST=127.0.0.1
STATSD_PORT=8125
VIRTUAL_ENV={envdir}
OS_TEST_TIMEOUT=30
OS_LOG_DEFAULTS={env:OS_LOG_DEFAULTS:gear.Server=INFO,gear.Client=INFO}
passenv = ZUUL_TEST_ROOT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE
usedevelop = True
install_command = pip install {opts} {packages}
@ -17,7 +18,17 @@ deps = -r{toxinidir}/requirements.txt
commands =
python setup.py testr --slowest --testr-args='{posargs}'
[testenv:bindep]
# Do not install any requirements. We want this to be fast and work even if
# system dependencies are missing, since it's used to tell you what system
# dependencies are missing! This also means that bindep must be installed
# separately, outside of the requirements files.
deps = bindep
commands = bindep test
[testenv:pep8]
# streamer is python3 only, so we need to run flake8 in python3
basepython = python3
commands = flake8 {posargs}
[testenv:cover]

0
zuul/ansible/__init__.py Normal file
View File

View File

View File

@ -0,0 +1,199 @@
#!/usr/bin/python
# Copyright (c) 2016 IBM Corp.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import socket
import threading
def daemonize():
# A really basic daemonize method that should work well enough for
# now in this circumstance. Based on the public domain code at:
# http://web.archive.org/web/20131017130434/http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
pid = os.fork()
if pid > 0:
return True
os.chdir('/')
os.setsid()
os.umask(0)
pid = os.fork()
if pid > 0:
sys.exit(0)
sys.stdout.flush()
sys.stderr.flush()
i = open('/dev/null', 'r')
o = open('/dev/null', 'a+')
e = open('/dev/null', 'a+', 0)
os.dup2(i.fileno(), sys.stdin.fileno())
os.dup2(o.fileno(), sys.stdout.fileno())
os.dup2(e.fileno(), sys.stderr.fileno())
return False
class Console(object):
def __init__(self, path):
self.path = path
self.file = open(path)
self.stat = os.stat(path)
self.size = self.stat.st_size
class Server(object):
def __init__(self, path, port):
self.path = path
s = None
for res in socket.getaddrinfo(None, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
s.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
except socket.error:
s = None
continue
try:
s.bind(sa)
s.listen(1)
except socket.error:
s.close()
s = None
continue
break
if s is None:
sys.exit(1)
self.socket = s
def accept(self):
conn, addr = self.socket.accept()
return conn
def run(self):
while True:
conn = self.accept()
t = threading.Thread(target=self.handleOneConnection, args=(conn,))
t.daemon = True
t.start()
def chunkConsole(self, conn):
try:
console = Console(self.path)
except Exception:
return
while True:
chunk = console.file.read(4096)
if not chunk:
break
conn.send(chunk)
return console
def followConsole(self, console, conn):
while True:
# As long as we have unread data, keep reading/sending
while True:
chunk = console.file.read(4096)
if chunk:
conn.send(chunk)
else:
break
# At this point, we are waiting for more data to be written
time.sleep(0.5)
# Check to see if the remote end has sent any data, if so,
# discard
r, w, e = select.select([conn], [], [conn], 0)
if conn in e:
return False
if conn in r:
ret = conn.recv(1024)
# Discard anything read, if input is eof, it has
# disconnected.
if not ret:
return False
# See if the file has been truncated
try:
st = os.stat(console.path)
if (st.st_ino != console.stat.st_ino or
st.st_size < console.size):
return True
except Exception:
return True
console.size = st.st_size
def handleOneConnection(self, conn):
# FIXME: this won't notice disconnects until it tries to send
console = None
try:
while True:
if console is not None:
try:
console.file.close()
except:
pass
while True:
console = self.chunkConsole(conn)
if console:
break
time.sleep(0.5)
while True:
if self.followConsole(console, conn):
break
else:
return
finally:
try:
conn.close()
except Exception:
pass
def test():
s = Server('/tmp/console.html', 8088)
s.run()
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(default='/tmp/console.html'),
port=dict(default=8088, type='int'),
)
)
p = module.params
path = p['path']
port = p['port']
if daemonize():
module.exit_json()
s = Server(path, port)
s.run()
from ansible.module_utils.basic import * # noqa
if __name__ == '__main__':
main()
# test()

View File

@ -0,0 +1,58 @@
#!/usr/bin/python
# Copyright (c) 2016 IBM Corp.
# Copyright (c) 2016 Red Hat
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import datetime
class Console(object):
def __enter__(self):
self.logfile = open('/tmp/console.html', 'a', 0)
return self
def __exit__(self, etype, value, tb):
self.logfile.close()
def addLine(self, ln):
ts = datetime.datetime.now()
outln = '%s | %s' % (str(ts), ln)
self.logfile.write(outln)
def log(msg):
if not isinstance(msg, list):
msg = [msg]
with Console() as console:
for line in msg:
console.addLine("[Zuul] %s\n" % line)
def main():
module = AnsibleModule(
argument_spec=dict(
msg=dict(required=True, type='raw'),
)
)
p = module.params
log(p['msg'])
module.exit_json(changed=True)
from ansible.module_utils.basic import * # noqa
if __name__ == '__main__':
main()

View File

@ -0,0 +1,131 @@
#!/usr/bin/python
# Copyright (c) 2016 IBM Corp.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import datetime
import getpass
import os
import subprocess
import threading
class Console(object):
def __enter__(self):
self.logfile = open('/tmp/console.html', 'a', 0)
return self
def __exit__(self, etype, value, tb):
self.logfile.close()
def addLine(self, ln):
# Note this format with deliminator is "inspired" by the old
# Jenkins format but with microsecond resolution instead of
# millisecond. It is kept so log parsing/formatting remains
# consistent.
ts = datetime.datetime.now()
outln = '%s | %s' % (ts, ln)
self.logfile.write(outln)
def get_env():
env = {}
env['HOME'] = os.path.expanduser('~')
env['USER'] = getpass.getuser()
# Known locations for PAM mod_env sources
for fn in ['/etc/environment', '/etc/default/locale']:
if os.path.exists(fn):
with open(fn) as f:
for line in f:
if not line:
continue
if line[0] == '#':
continue
if '=' not in line:
continue
k, v = line.strip().split('=')
for q in ["'", '"']:
if v[0] == q:
v = v.strip(q)
env[k] = v
return env
def follow(fd):
newline_warning = False
with Console() as console:
while True:
line = fd.readline()
if not line:
break
if not line.endswith('\n'):
line += '\n'
newline_warning = True
console.addLine(line)
if newline_warning:
console.addLine('[Zuul] No trailing newline\n')
def run(cwd, cmd, args):
env = get_env()
env.update(args)
proc = subprocess.Popen(
['/bin/bash', '-l', '-c', cmd],
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
)
t = threading.Thread(target=follow, args=(proc.stdout,))
t.daemon = True
t.start()
ret = proc.wait()
# Give the thread that is writing the console log up to 10 seconds
# to catch up and exit. If it hasn't done so by then, it is very
# likely stuck in readline() because it spawed a child that is
# holding stdout or stderr open.
t.join(10)
with Console() as console:
if t.isAlive():
console.addLine("[Zuul] standard output/error still open "
"after child exited")
console.addLine("[Zuul] Task exit code: %s\n" % ret)
return ret
def main():
module = AnsibleModule(
argument_spec=dict(
command=dict(required=True, default=None),
cwd=dict(required=True, default=None),
parameters=dict(default={}, type='dict')
)
)
p = module.params
env = p['parameters'].copy()
ret = run(p['cwd'], p['command'], env)
if ret == 0:
module.exit_json(changed=True, rc=ret)
else:
module.fail_json(msg="Exit code %s" % ret, rc=ret)
from ansible.module_utils.basic import * # noqa
if __name__ == '__main__':
main()

View File

View File

@ -0,0 +1,52 @@
# Copyright 2016 IBM Corp.
#
# This file is part of Zuul
#
# This file is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This file is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this file. If not, see <http://www.gnu.org/licenses/>.
import time
from ansible.executor.task_result import TaskResult
from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
def __init__(self, *args, **kw):
super(CallbackModule, self).__init__(*args, **kw)
self._elapsed_time = 0.0
self._task_start_time = None
self._play = None
def v2_playbook_on_play_start(self, play):
self._play = play
def playbook_on_task_start(self, name, is_conditional):
self._task_start_time = time.time()
def v2_on_any(self, *args, **kw):
result = None
if args and isinstance(args[0], TaskResult):
result = args[0]
if not result:
return
if self._task_start_time is not None:
task_time = time.time() - self._task_start_time
self._elapsed_time += task_time
if self._play and result._host:
manager = self._play.get_variable_manager()
facts = dict(elapsed_time=int(self._elapsed_time))
manager.set_nonpersistent_facts(result._host, facts)
self._task_start_time = None

View File

@ -101,7 +101,7 @@ class MatchAllFiles(AbstractMatcherCollection):
yield self.commit_regex
def matches(self, change):
if not (hasattr(change, 'files') and change.files):
if not (hasattr(change, 'files') and len(change.files) > 1):
return False
for file_ in change.files:
matched_file = False

View File

@ -14,8 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
from six.moves import configparser as ConfigParser
import cStringIO
import extras
import logging
import logging.config
@ -47,7 +47,7 @@ def stack_dump_handler(signum, frame):
yappi.start()
else:
yappi.stop()
yappi_out = cStringIO.StringIO()
yappi_out = six.BytesIO()
yappi.get_func_stats().print_all(out=yappi_out)
yappi.get_thread_stats().print_all(out=yappi_out)
log.debug(yappi_out.getvalue())

View File

@ -154,7 +154,7 @@ class Client(zuul.cmd.ZuulApp):
running_items = client.get_running_jobs()
if len(running_items) == 0:
print "No jobs currently running"
print("No jobs currently running")
return True
all_fields = self._show_running_jobs_columns()
@ -181,7 +181,7 @@ class Client(zuul.cmd.ZuulApp):
v += all_fields[f]['append']
values.append(v)
table.add_row(values)
print table
print(table)
return True
def _epoch_to_relative_time(self, epoch):

View File

@ -27,6 +27,8 @@ ZUUL_ENV_SUFFIXES = (
'branch',
'ref',
'url',
'project',
'newrev',
)
@ -98,6 +100,10 @@ class Cloner(zuul.cmd.ZuulApp):
parser.error("Specifying a Zuul ref requires a Zuul url. "
"Define Zuul arguments either via environment "
"variables or using options above.")
if 'zuul_newrev' in zuul_args and 'zuul_project' not in zuul_args:
parser.error("ZUUL_NEWREV has been specified without "
"ZUUL_PROJECT. Please define a ZUUL_PROJECT or do "
"not set ZUUL_NEWREV.")
self.args = args
@ -145,6 +151,8 @@ class Cloner(zuul.cmd.ZuulApp):
clone_map_file=self.args.clone_map_file,
project_branches=project_branches,
cache_dir=self.args.cache_dir,
zuul_newrev=self.args.zuul_newrev,
zuul_project=self.args.zuul_project,
)
cloner.execute()

126
zuul/cmd/launcher.py Normal file
View File

@ -0,0 +1,126 @@
#!/usr/bin/env python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import daemon
import extras
# as of python-daemon 1.6 it doesn't bundle pidlockfile anymore
# instead it depends on lockfile-0.9.1 which uses pidfile.
pid_file_module = extras.try_imports(['daemon.pidlockfile', 'daemon.pidfile'])
import logging
import os
import socket
import sys
import signal
import zuul.cmd
import zuul.launcher.ansiblelaunchserver
# No zuul imports that pull in paramiko here; it must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
# Similar situation with gear and statsd.
class Launcher(zuul.cmd.ZuulApp):
def parse_arguments(self):
parser = argparse.ArgumentParser(description='Zuul launch worker.')
parser.add_argument('-c', dest='config',
help='specify the config file')
parser.add_argument('-d', dest='nodaemon', action='store_true',
help='do not run as a daemon')
parser.add_argument('--version', dest='version', action='version',
version=self._get_version(),
help='show zuul version')
parser.add_argument('--keep-jobdir', dest='keep_jobdir',
action='store_true',
help='keep local jobdirs after run completes')
parser.add_argument('command',
choices=zuul.launcher.ansiblelaunchserver.COMMANDS,
nargs='?')
self.args = parser.parse_args()
def send_command(self, cmd):
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(
self.config.get('zuul', 'state_dir'))
else:
state_dir = '/var/lib/zuul'
path = os.path.join(state_dir, 'launcher.socket')
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(path)
s.sendall('%s\n' % cmd)
def exit_handler(self):
self.launcher.stop()
self.launcher.join()
def main(self, daemon=True):
# See comment at top of file about zuul imports
self.setup_logging('launcher', 'log_config')
self.log = logging.getLogger("zuul.Launcher")
LaunchServer = zuul.launcher.ansiblelaunchserver.LaunchServer
self.launcher = LaunchServer(self.config,
keep_jobdir=self.args.keep_jobdir)
self.launcher.start()
signal.signal(signal.SIGUSR2, zuul.cmd.stack_dump_handler)
if daemon:
self.launcher.join()
else:
while True:
try:
signal.pause()
except KeyboardInterrupt:
print("Ctrl + C: asking launcher to exit nicely...\n")
self.exit_handler()
sys.exit(0)
def main():
server = Launcher()
server.parse_arguments()
server.read_config()
if server.args.command in zuul.launcher.ansiblelaunchserver.COMMANDS:
server.send_command(server.args.command)
sys.exit(0)
server.configure_connections()
if server.config.has_option('launcher', 'pidfile'):
pid_fn = os.path.expanduser(server.config.get('launcher', 'pidfile'))
else:
pid_fn = '/var/run/zuul-launcher/zuul-launcher.pid'
pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10)
if server.args.nodaemon:
server.main(False)
else:
with daemon.DaemonContext(pidfile=pid):
server.main(True)
if __name__ == "__main__":
sys.path.insert(0, '.')
main()

View File

@ -68,7 +68,7 @@ class Merger(zuul.cmd.ZuulApp):
try:
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking merger to exit nicely...\n"
print("Ctrl + C: asking merger to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)
@ -89,9 +89,7 @@ def main():
f.close()
os.unlink(test_fn)
except Exception:
print
print "Unable to write to state directory: %s" % state_dir
print
print("\nUnable to write to state directory: %s\n" % state_dir)
raise
if server.config.has_option('merger', 'pidfile'):

View File

@ -61,12 +61,9 @@ class Server(zuul.cmd.ZuulApp):
def reconfigure_handler(self, signum, frame):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
self.log.debug("Reconfiguration triggered")
self.sched.stopConnections()
self.read_config()
self.setup_logging('zuul', 'log_config')
try:
self.configure_connections()
self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
except Exception:
self.log.exception("Reconfiguration failed:")
@ -89,8 +86,10 @@ class Server(zuul.cmd.ZuulApp):
import zuul.trigger.gerrit
logging.basicConfig(level=logging.DEBUG)
self.sched = zuul.scheduler.Scheduler(self.config)
self.sched = zuul.scheduler.Scheduler(self.config,
testonly=True)
self.configure_connections()
self.sched.registerConnections(self.connections, load=False)
layout = self.sched.testConfig(self.config.get('zuul',
'layout_config'),
self.connections)
@ -109,7 +108,7 @@ class Server(zuul.cmd.ZuulApp):
jobs.add(v)
for job in sorted(layout.jobs):
if job not in jobs:
print "Job %s not defined" % job
print("FAILURE: Job %s not defined" % job)
failure = True
return failure
@ -119,18 +118,18 @@ class Server(zuul.cmd.ZuulApp):
if child_pid == 0:
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
import gear
import zuul.lib.gearserver
statsd_host = os.environ.get('STATSD_HOST')
statsd_port = int(os.environ.get('STATSD_PORT', 8125))
if self.config.has_option('gearman_server', 'listen_address'):
host = self.config.get('gearman_server', 'listen_address')
else:
host = None
gear.Server(4730,
host=host,
statsd_host=statsd_host,
statsd_port=statsd_port,
statsd_prefix='zuul.geard')
zuul.lib.gearserver.GearServer(4730,
host=host,
statsd_host=statsd_host,
statsd_port=statsd_port,
statsd_prefix='zuul.geard')
# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
@ -174,7 +173,20 @@ class Server(zuul.cmd.ZuulApp):
cache_expiry = self.config.getint('zuul', 'status_expiry')
else:
cache_expiry = 1
webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
if self.config.has_option('webapp', 'listen_address'):
listen_address = self.config.get('webapp', 'listen_address')
else:
listen_address = '0.0.0.0'
if self.config.has_option('webapp', 'port'):
port = self.config.getint('webapp', 'port')
else:
port = 8001
webapp = zuul.webapp.WebApp(
self.sched, port=port, cache_expiry=cache_expiry,
listen_address=listen_address)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.configure_connections()
@ -198,7 +210,7 @@ class Server(zuul.cmd.ZuulApp):
try:
signal.pause()
except KeyboardInterrupt:
print "Ctrl + C: asking scheduler to exit nicely...\n"
print("Ctrl + C: asking scheduler to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)

View File

@ -12,6 +12,7 @@
import os
import logging
import six
import yaml
import voluptuous as vs
@ -154,7 +155,7 @@ class ProjectTemplateParser(object):
if not tree:
tree = model.JobTree(None)
for conf_job in conf:
if isinstance(conf_job, basestring):
if isinstance(conf_job, six.string_types):
tree.addJob(layout.getJob(conf_job))
elif isinstance(conf_job, dict):
# A dictionary in a job tree may override params, or

View File

@ -19,22 +19,36 @@ import select
import threading
import time
from six.moves import queue as Queue
from six.moves import urllib
import paramiko
import logging
import pprint
import voluptuous as v
import urllib2
from zuul.connection import BaseConnection
from zuul.model import TriggerEvent, Project, Change, Ref, NullChange
from zuul import exceptions
# Walk the change dependency tree to find a cycle
def detect_cycle(change, history=None):
if history is None:
history = []
else:
history = history[:]
history.append(change.number)
for dep in change.needs_changes:
if dep.number in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep.number, history))
detect_cycle(dep, history)
class GerritEventConnector(threading.Thread):
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
delay = 5.0
delay = 10.0
def __init__(self, connection):
super(GerritEventConnector, self).__init__()
@ -96,7 +110,7 @@ class GerritEventConnector(threading.Thread):
try:
event.account = data.get(accountfield_from_type[event.type])
except KeyError:
self.log.error("Received unrecognized event type '%s' from Gerrit.\
self.log.warning("Received unrecognized event type '%s' from Gerrit.\
Can not get account information." % event.type)
event.account = None
@ -132,6 +146,7 @@ class GerritEventConnector(threading.Thread):
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
poll_timeout = 500
def __init__(self, gerrit_connection, username, hostname, port=29418,
keyfile=None):
@ -154,7 +169,7 @@ class GerritWatcher(threading.Thread):
poll = select.poll()
poll.register(stdout.channel)
while not self._stopped:
ret = poll.poll()
ret = poll.poll(self.poll_timeout)
for (fd, event) in ret:
if fd == stdout.channel.fileno():
if event == select.POLLIN:
@ -290,7 +305,7 @@ class GerritConnection(BaseConnection):
raise
return change
def _getDependsOnFromCommit(self, message):
def _getDependsOnFromCommit(self, message, change):
records = []
seen = set()
for match in self.depends_on_re.findall(message):
@ -300,17 +315,19 @@ class GerritConnection(BaseConnection):
continue
seen.add(match)
query = "change:%s" % (match,)
self.log.debug("Running query %s to find needed changes" %
(query,))
self.log.debug("Updating %s: Running query %s "
"to find needed changes" %
(change, query,))
records.extend(self.simpleQuery(query))
return records
def _getNeededByFromCommit(self, change_id):
def _getNeededByFromCommit(self, change_id, change):
records = []
seen = set()
query = 'message:%s' % change_id
self.log.debug("Running query %s to find changes needed-by" %
(query,))
self.log.debug("Updating %s: Running query %s "
"to find changes needed-by" %
(change, query,))
results = self.simpleQuery(query)
for result in results:
for match in self.depends_on_re.findall(
@ -320,15 +337,15 @@ class GerritConnection(BaseConnection):
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
self.log.debug("Found change %s,%s needs %s from commit" %
(key[0], key[1], change_id))
self.log.debug("Updating %s: Found change %s,%s "
"needs %s from commit" %
(change, key[0], key[1], change_id))
seen.add(key)
records.append(result)
return records
def _updateChange(self, change, history=None):
self.log.info("Updating information for %s,%s" %
(change.number, change.patchset))
self.log.info("Updating %s" % (change,))
data = self.query(change.number)
change._data = data
@ -364,6 +381,7 @@ class GerritConnection(BaseConnection):
if change.is_merged:
# This change is merged, so we don't need to look any further
# for dependencies.
self.log.debug("Updating %s: change is merged" % (change,))
return change
if history is None:
@ -379,21 +397,35 @@ class GerritConnection(BaseConnection):
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting git-dependent change %s,%s" %
(dep_num, dep_ps))
self.log.debug("Updating %s: Getting git-dependent change %s,%s" %
(change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
# Because we are not forcing a refresh in _getChange, it
# may return without executing this code, so if we are
# updating our change to add ourselves to a dependency
# cycle, we won't detect it. By explicitly performing a
# walk of the dependency tree, we will.
detect_cycle(dep, history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
for record in self._getDependsOnFromCommit(data['commitMessage']):
for record in self._getDependsOnFromCommit(data['commitMessage'],
change):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
if dep_num in history:
raise Exception("Dependency cycle detected: %s in %s" % (
dep_num, history))
self.log.debug("Getting commit-dependent change %s,%s" %
(dep_num, dep_ps))
self.log.debug("Updating %s: Getting commit-dependent "
"change %s,%s" %
(change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history)
# Because we are not forcing a refresh in _getChange, it
# may return without executing this code, so if we are
# updating our change to add ourselves to a dependency
# cycle, we won't detect it. By explicitly performing a
# walk of the dependency tree, we will.
detect_cycle(dep, history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
@ -403,15 +435,17 @@ class GerritConnection(BaseConnection):
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
self.log.debug("Updating %s: Getting git-needed change %s,%s" %
(change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps)
if (not dep.is_merged) and dep.is_current_patchset:
needed_by_changes.append(dep)
for record in self._getNeededByFromCommit(data['id']):
for record in self._getNeededByFromCommit(data['id'], change):
dep_num = record['number']
dep_ps = record['currentPatchSet']['number']
self.log.debug("Getting commit-needed change %s,%s" %
(dep_num, dep_ps))
self.log.debug("Updating %s: Getting commit-needed change %s,%s" %
(change, dep_num, dep_ps))
# Because a commit needed-by may be a cross-repo
# dependency, cause that change to refresh so that it will
# reference the latest patchset of its Depends-On (this
@ -434,6 +468,10 @@ class GerritConnection(BaseConnection):
data = self.query(change.number)
change._data = data
change.is_merged = self._isMerged(change)
if change.is_merged:
self.log.debug("Change %s is merged" % (change,))
else:
self.log.debug("Change %s is not merged" % (change,))
if not head:
return change.is_merged
if not change.is_merged:
@ -456,7 +494,6 @@ class GerritConnection(BaseConnection):
status = data.get('status')
if not status:
return False
self.log.debug("Change %s status: %s" % (change, status))
if status == 'MERGED':
return True
return False
@ -666,10 +703,10 @@ class GerritConnection(BaseConnection):
url = "%s/p/%s/info/refs?service=git-upload-pack" % (
self.baseurl, project)
try:
data = urllib2.urlopen(url).read()
data = urllib.request.urlopen(url).read()
except:
self.log.error("Cannot get references from %s" % url)
raise # keeps urllib2 error informations
raise # keeps urllib error informations
ret = {}
read_headers = False
read_advertisement = False

View File

@ -22,5 +22,14 @@ class ChangeNotFound(Exception):
super(ChangeNotFound, self).__init__(message)
class RevNotFound(Exception):
def __init__(self, project, rev):
self.project = project
self.revision = rev
message = ("Failed to checkout project '%s' at revision '%s'"
% (self.project, self.revision))
super(RevNotFound, self).__init__(message)
class MergeFailure(Exception):
pass

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ import inspect
import json
import logging
import os
import six
import time
import threading
from uuid import uuid4
@ -193,6 +194,11 @@ class LaunchClient(object):
port = config.get('gearman', 'port')
else:
port = 4730
if config.has_option('gearman', 'check_job_registration'):
self.job_registration = config.getboolean(
'gearman', 'check_job_registration')
else:
self.job_registration = True
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port)
@ -260,7 +266,7 @@ class LaunchClient(object):
s_config = {}
s_config.update((k, v.format(item=item, job=job,
change=item.change))
if isinstance(v, basestring)
if isinstance(v, six.string_types)
else (k, v)
for k, v in s.items())
@ -391,7 +397,8 @@ class LaunchClient(object):
build.__gearman_job = gearman_job
self.builds[uuid] = build
if not self.isJobRegistered(gearman_job.name):
if self.job_registration and not self.isJobRegistered(
gearman_job.name):
self.log.error("Job %s is not registered with Gearman" %
gearman_job)
self.onBuildCompleted(gearman_job, 'NOT_REGISTERED')
@ -496,9 +503,6 @@ class LaunchClient(object):
build.number = data.get('number')
build.__gearman_manager = data.get('manager')
self.sched.onBuildStarted(build)
if job.denominator:
build.estimated_time = float(job.denominator) / 1000
else:
self.log.error("Unable to find build %s" % job.unique)
@ -545,7 +549,7 @@ class LaunchClient(object):
# us where the job is running.
return False
if not self.isJobRegistered(name):
if self.job_registration and not self.isJobRegistered(name):
return False
desc_uuid = str(uuid4().hex)

View File

@ -277,8 +277,8 @@ class LaunchServer(object):
)
(out, err) = proc.communicate()
ret = proc.wait()
print out
print err
print(out)
print(err)
if ret == 0:
return 'SUCCESS'
else:

View File

@ -19,6 +19,9 @@ import logging
import os
import re
import six
OrderedDict = extras.try_imports(['collections.OrderedDict',
'ordereddict.OrderedDict'])
@ -59,17 +62,17 @@ class CloneMapper(object):
raise Exception("Expansion error. Check error messages above")
self.log.info("Mapping projects to workspace...")
for project, dest in ret.iteritems():
for project, dest in six.iteritems(ret):
dest = os.path.normpath(os.path.join(workspace, dest[0]))
ret[project] = dest
self.log.info(" %s -> %s", project, dest)
self.log.debug("Checking overlap in destination directories...")
check = defaultdict(list)
for project, dest in ret.iteritems():
for project, dest in six.iteritems(ret):
check[dest].append(project)
dupes = dict((d, p) for (d, p) in check.iteritems() if len(p) > 1)
dupes = dict((d, p) for (d, p) in six.iteritems(check) if len(p) > 1)
if dupes:
raise Exception("Some projects share the same destination: %s",
dupes)

View File

@ -19,7 +19,10 @@ import os
import re
import yaml
import six
from git import GitCommandError
from zuul import exceptions
from zuul.lib.clonemapper import CloneMapper
from zuul.merger.merger import Repo
@ -29,7 +32,8 @@ class Cloner(object):
def __init__(self, git_base_url, projects, workspace, zuul_branch,
zuul_ref, zuul_url, branch=None, clone_map_file=None,
project_branches=None, cache_dir=None):
project_branches=None, cache_dir=None, zuul_newrev=None,
zuul_project=None):
self.clone_map = []
self.dests = None
@ -43,6 +47,10 @@ class Cloner(object):
self.zuul_ref = zuul_ref or ''
self.zuul_url = zuul_url
self.project_branches = project_branches or {}
self.project_revisions = {}
if zuul_newrev and zuul_project:
self.project_revisions[zuul_project] = zuul_newrev
if clone_map_file:
self.readCloneMap(clone_map_file)
@ -62,7 +70,7 @@ class Cloner(object):
dests = mapper.expand(workspace=self.workspace)
self.log.info("Preparing %s repositories", len(dests))
for project, dest in dests.iteritems():
for project, dest in six.iteritems(dests):
self.prepareRepo(project, dest)
self.log.info("Prepared all repositories")
@ -103,7 +111,14 @@ class Cloner(object):
repo.fetchFrom(zuul_remote, ref)
self.log.debug("Fetched ref %s from %s", ref, project)
return True
except (ValueError, GitCommandError):
except ValueError:
self.log.debug("Project %s in Zuul does not have ref %s",
project, ref)
return False
except GitCommandError as error:
# Bail out if fetch fails due to infrastructure reasons
if error.stderr.startswith('fatal: unable to access'):
raise
self.log.debug("Project %s in Zuul does not have ref %s",
project, ref)
return False
@ -112,10 +127,15 @@ class Cloner(object):
"""Clone a repository for project at dest and apply a reference
suitable for testing. The reference lookup is attempted in this order:
1) Zuul reference for the indicated branch
2) Zuul reference for the master branch
3) The tip of the indicated branch
4) The tip of the master branch
1) The indicated revision for specific project
2) Zuul reference for the indicated branch
3) Zuul reference for the master branch
4) The tip of the indicated branch
5) The tip of the master branch
If an "indicated revision" is specified for this project, and we are
unable to meet this requirement, we stop attempting to check this
repo out and raise a zuul.exceptions.RevNotFound exception.
The "indicated branch" is one of the following:
@ -135,6 +155,10 @@ class Cloner(object):
# `git branch` is happy with.
repo.reset()
indicated_revision = None
if project in self.project_revisions:
indicated_revision = self.project_revisions[project]
indicated_branch = self.branch or self.zuul_branch
if project in self.project_branches:
indicated_branch = self.project_branches[project]
@ -149,8 +173,9 @@ class Cloner(object):
self.log.info("upstream repo has branch %s", indicated_branch)
fallback_branch = indicated_branch
else:
self.log.info("upstream repo is missing branch %s",
self.branch)
if indicated_branch:
self.log.info("upstream repo is missing branch %s",
indicated_branch)
# FIXME should be origin HEAD branch which might not be 'master'
fallback_branch = 'master'
@ -160,13 +185,26 @@ class Cloner(object):
else:
fallback_zuul_ref = None
# If the user has requested an explicit revision to be checked out,
# we use it above all else, and if we cannot satisfy this requirement
# we raise an error and do not attempt to continue.
if indicated_revision:
self.log.info("Attempting to check out revision %s for "
"project %s", indicated_revision, project)
try:
self.fetchFromZuul(repo, project, self.zuul_ref)
commit = repo.checkout(indicated_revision)
except (ValueError, GitCommandError):
raise exceptions.RevNotFound(project, indicated_revision)
self.log.info("Prepared '%s' repo at revision '%s'", project,
indicated_revision)
# If we have a non empty zuul_ref to use, use it. Otherwise we fall
# back to checking out the branch.
if ((override_zuul_ref and
self.fetchFromZuul(repo, project, override_zuul_ref)) or
(fallback_zuul_ref and
fallback_zuul_ref != override_zuul_ref and
self.fetchFromZuul(repo, project, fallback_zuul_ref))):
elif ((override_zuul_ref and
self.fetchFromZuul(repo, project, override_zuul_ref)) or
(fallback_zuul_ref and
fallback_zuul_ref != override_zuul_ref and
self.fetchFromZuul(repo, project, fallback_zuul_ref))):
# Work around a bug in GitPython which can not parse FETCH_HEAD
gitcmd = git.Git(dest)
fetch_head = gitcmd.rev_parse('FETCH_HEAD')

83
zuul/lib/commandsocket.py Normal file
View File

@ -0,0 +1,83 @@
# Copyright 2014 OpenStack Foundation
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import socket
import threading
import Queue
class CommandSocket(object):
log = logging.getLogger("zuul.CommandSocket")
def __init__(self, path):
self.running = False
self.path = path
self.queue = Queue.Queue()
def start(self):
self.running = True
if os.path.exists(self.path):
os.unlink(self.path)
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.bind(self.path)
self.socket.listen(1)
self.socket_thread = threading.Thread(target=self._socketListener)
self.socket_thread.daemon = True
self.socket_thread.start()
def stop(self):
# First, wake up our listener thread with a connection and
# tell it to stop running.
self.running = False
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self.path)
s.sendall('_stop\n')
# The command '_stop' will be ignored by our listener, so
# directly inject it into the queue so that consumers of this
# class which are waiting in .get() are awakened. They can
# either handle '_stop' or just ignore the unknown command and
# then check to see if they should continue to run before
# re-entering their loop.
self.queue.put('_stop')
self.socket_thread.join()
def _socketListener(self):
while self.running:
try:
s, addr = self.socket.accept()
self.log.debug("Accepted socket connection %s" % (s,))
buf = ''
while True:
buf += s.recv(1)
if buf[-1] == '\n':
break
buf = buf.strip()
self.log.debug("Received %s from socket" % (buf,))
s.close()
# Because we use '_stop' internally to wake up a
# waiting thread, don't allow it to actually be
# injected externally.
if buf != '_stop':
self.queue.put(buf)
except Exception:
self.log.exception("Exception in socket handler")
def get(self):
if not self.running:
raise Exception("CommandSocket.get called while stopped")
return self.queue.get()

View File

@ -24,10 +24,11 @@ class ConnectionRegistry(object):
def __init__(self):
self.connections = {}
def registerScheduler(self, sched):
def registerScheduler(self, sched, load=True):
for connection_name, connection in self.connections.items():
connection.registerScheduler(sched)
connection.onLoad()
if load:
connection.onLoad()
def stop(self):
for connection_name, connection in self.connections.items():

35
zuul/lib/gearserver.py Normal file
View File

@ -0,0 +1,35 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import gear
MASS_DO = 101
class GearServer(gear.Server):
def handlePacket(self, packet):
if packet.ptype == MASS_DO:
self.log.info("Received packet from %s: %s" % (packet.connection,
packet))
self.handleMassDo(packet)
else:
return super(GearServer, self).handlePacket(packet)
def handleMassDo(self, packet):
packet.connection.functions = set()
for name in packet.data.split(b'\x00'):
self.log.debug("Adding function %s to %s" % (
name, packet.connection))
packet.connection.functions.add(name)
self.functions.add(name)

View File

@ -19,8 +19,8 @@ from time import time
import os
import random
import six
from six.moves import urllib
import string
import urlparse
class Swift(object):
@ -156,7 +156,7 @@ class Swift(object):
url = os.path.join(self.storage_url, settings['container'],
settings['file_path_prefix'],
destination_prefix)
u = urlparse.urlparse(url)
u = urllib.parse.urlparse(url)
hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
settings['max_file_size'],

View File

@ -217,7 +217,7 @@ class Merger(object):
fd.write('#!/bin/bash\n')
fd.write('ssh -i %s $@\n' % key)
fd.close()
os.chmod(name, 0755)
os.chmod(name, 0o755)
def addProject(self, project, url):
repo = None

View File

@ -19,7 +19,7 @@ import traceback
import gear
import merger
from zuul.merger import merger
class MergeServer(object):

View File

@ -13,7 +13,9 @@
# under the License.
import copy
import os
import re
import struct
import time
from uuid import uuid4
import extras
@ -121,7 +123,11 @@ class Pipeline(object):
return job_tree
def getProjects(self):
return sorted(self.job_trees.keys(), lambda a, b: cmp(a.name, b.name))
# cmp is not in python3, applied idiom from
# http://python-future.org/compatible_idioms.html#cmp
return sorted(
self.job_trees.keys(),
key=lambda p: p.name)
def addQueue(self, queue):
self.queues.append(queue)
@ -273,7 +279,7 @@ class Pipeline(object):
items.extend(shared_queue.queue)
return items
def formatStatusJSON(self):
def formatStatusJSON(self, url_pattern=None):
j_pipeline = dict(name=self.name,
description=self.description)
j_queues = []
@ -290,7 +296,7 @@ class Pipeline(object):
if j_changes:
j_queue['heads'].append(j_changes)
j_changes = []
j_changes.append(e.formatJSON())
j_changes.append(e.formatJSON(url_pattern))
if (len(j_changes) > 1 and
(j_changes[-2]['remaining_time'] is not None) and
(j_changes[-1]['remaining_time'] is not None)):
@ -413,7 +419,7 @@ class ChangeQueue(object):
elif self.window_decrease_type == 'exponential':
self.window = max(
self.window_floor,
self.window / self.window_decrease_factor)
int(self.window / self.window_decrease_factor))
class Project(object):
@ -766,7 +772,34 @@ class QueueItem(object):
return []
return self.job_tree.getJobs()
def formatJSON(self):
def formatJobResult(self, job, url_pattern=None):
build = self.current_build_set.getBuild(job.name)
result = build.result
pattern = url_pattern
if result == 'SUCCESS':
if job.success_message:
result = job.success_message
if job.success_pattern:
pattern = job.success_pattern
elif result == 'FAILURE':
if job.failure_message:
result = job.failure_message
if job.failure_pattern:
pattern = job.failure_pattern
url = None
if pattern:
try:
url = pattern.format(change=self.change,
pipeline=self.pipeline,
job=job,
build=build)
except Exception:
pass # FIXME: log this or something?
if not url:
url = build.url or job.name
return (result, url)
def formatJSON(self, url_pattern=None):
changeish = self.change
ret = {}
ret['active'] = self.active
@ -803,11 +836,13 @@ class QueueItem(object):
elapsed = None
remaining = None
result = None
url = None
build_url = None
report_url = None
worker = None
if build:
result = build.result
url = build.url
build_url = build.url
(unused, report_url) = self.formatJobResult(job, url_pattern)
if build.start_time:
if build.end_time:
elapsed = int((build.end_time -
@ -835,7 +870,8 @@ class QueueItem(object):
'name': job.name,
'elapsed_time': elapsed,
'remaining_time': remaining,
'url': url,
'url': build_url,
'report_url': report_url,
'result': result,
'voting': job.voting,
'uuid': build.uuid if build else None,
@ -1085,7 +1121,7 @@ class BaseFilter(object):
for a in approvals:
for k, v in a.items():
if k == 'username':
pass
a['username'] = re.compile(v)
elif k in ['email', 'email-filter']:
a['email'] = re.compile(v)
elif k == 'newer-than':
@ -1104,7 +1140,7 @@ class BaseFilter(object):
by = approval.get('by', {})
for k, v in rapproval.items():
if k == 'username':
if (by.get('username', '') != v):
if (not v.search(by.get('username', ''))):
return False
elif k == 'email':
if (not v.search(by.get('email', ''))):
@ -1517,3 +1553,78 @@ class Tenant(object):
class Abide(object):
def __init__(self):
self.tenants = OrderedDict()
class JobTimeData(object):
format = 'B10H10H10B'
version = 0
def __init__(self, path):
self.path = path
self.success_times = [0 for x in range(10)]
self.failure_times = [0 for x in range(10)]
self.results = [0 for x in range(10)]
def load(self):
if not os.path.exists(self.path):
return
with open(self.path) as f:
data = struct.unpack(self.format, f.read())
version = data[0]
if version != self.version:
raise Exception("Unkown data version")
self.success_times = list(data[1:11])
self.failure_times = list(data[11:21])
self.results = list(data[21:32])
def save(self):
tmpfile = self.path + '.tmp'
data = [self.version]
data.extend(self.success_times)
data.extend(self.failure_times)
data.extend(self.results)
data = struct.pack(self.format, *data)
with open(tmpfile, 'w') as f:
f.write(data)
os.rename(tmpfile, self.path)
def add(self, elapsed, result):
elapsed = int(elapsed)
if result == 'SUCCESS':
self.success_times.append(elapsed)
self.success_times.pop(0)
result = 0
else:
self.failure_times.append(elapsed)
self.failure_times.pop(0)
result = 1
self.results.append(result)
self.results.pop(0)
def getEstimatedTime(self):
times = [x for x in self.success_times if x]
if times:
return float(sum(times)) / len(times)
return 0.0
class TimeDataBase(object):
def __init__(self, root):
self.root = root
self.jobs = {}
def _getTD(self, name):
td = self.jobs.get(name)
if not td:
td = JobTimeData(os.path.join(self.root, name))
self.jobs[name] = td
td.load()
return td
def getEstimatedTime(self, name):
return self._getTD(name).getEstimatedTime()
def update(self, name, elapsed, result):
td = self._getTD(name)
td.add(elapsed, result)
td.save()

View File

@ -13,6 +13,7 @@
# under the License.
import abc
import logging
import six
@ -24,6 +25,8 @@ class BaseReporter(object):
Defines the exact public methods that must be supplied.
"""
log = logging.getLogger("zuul.reporter.BaseReporter")
def __init__(self, reporter_config={}, connection=None):
self.reporter_config = reporter_config
self.connection = connection
@ -107,25 +110,7 @@ class BaseReporter(object):
for job in pipeline.getJobs(item):
build = item.current_build_set.getBuild(job.name)
result = build.result
pattern = url_pattern
if result == 'SUCCESS':
if job.success_message:
result = job.success_message
if job.success_pattern:
pattern = job.success_pattern
elif result == 'FAILURE':
if job.failure_message:
result = job.failure_message
if job.failure_pattern:
pattern = job.failure_pattern
if pattern:
url = pattern.format(change=item.change,
pipeline=pipeline,
job=job,
build=build)
else:
url = build.url or job.name
(result, url) = item.formatJobResult(job, url_pattern)
if not job.voting:
voting = ' (non-voting)'
else:

View File

@ -21,7 +21,7 @@ import traceback
import gear
import six
import model
from zuul import model
class RPCListener(object):
@ -40,11 +40,11 @@ class RPCListener(object):
port = 4730
self.worker = gear.Worker('Zuul RPC Listener')
self.worker.addServer(server, port)
self.worker.waitForServer()
self.register()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
self.worker.waitForServer()
self.register()
def register(self):
self.worker.registerFunction("zuul:enqueue")
@ -66,8 +66,8 @@ class RPCListener(object):
while self._running:
try:
job = self.worker.getJob()
z, jobname = job.name.split(':')
self.log.debug("Received job %s" % job.name)
z, jobname = job.name.split(':')
attrname = 'handle_' + jobname
if hasattr(self, attrname):
f = getattr(self, attrname)

View File

@ -20,14 +20,15 @@ import json
import logging
import os
import pickle
import six
from six.moves import queue as Queue
import sys
import threading
import time
import configloader
import model
from model import Project
from zuul import configloader
from zuul import model
from zuul.model import Project
from zuul import exceptions
from zuul import version as zuul_version
@ -100,12 +101,10 @@ class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
self._wait_event = threading.Event()
self._exception = None
self._traceback = None
self._exc_info = None
def exception(self, e, tb):
self._exception = e
self._traceback = tb
def exception(self, exc_info):
self._exc_info = exc_info
self._wait_event.set()
def done(self):
@ -113,8 +112,8 @@ class ManagementEvent(object):
def wait(self, timeout=None):
self._wait_event.wait(timeout)
if self._exception:
raise self._exception, None, self._traceback
if self._exc_info:
six.reraise(*self._exc_info)
return self._wait_event.is_set()
@ -211,7 +210,7 @@ def toList(item):
class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler")
def __init__(self, config):
def __init__(self, config, testonly=False):
threading.Thread.__init__(self)
self.daemon = True
self.wake_event = threading.Event()
@ -238,6 +237,10 @@ class Scheduler(threading.Thread):
self.management_event_queue = Queue.Queue()
self.abide = model.Abide()
if not testonly:
time_dir = self._get_time_database_dir()
self.time_database = model.TimeDataBase(time_dir)
self.zuul_version = zuul_version.version_info.release_string()
self.last_reconfigured = None
@ -251,9 +254,11 @@ class Scheduler(threading.Thread):
# registerConnections as we don't want to do the onLoad event yet.
return self._parseConfig(config_path, connections)
def registerConnections(self, connections):
def registerConnections(self, connections, load=True):
# load: whether or not to trigger the onLoad for the connection. This
# is useful for not doing a full load during layout validation.
self.connections = connections
self.connections.registerScheduler(self)
self.connections.registerScheduler(self, load)
def stopConnections(self):
self.connections.stop()
@ -388,6 +393,17 @@ class Scheduler(threading.Thread):
state_dir = '/var/lib/zuul'
return os.path.join(state_dir, 'queue.pickle')
def _get_time_database_dir(self):
if self.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(self.config.get('zuul',
'state_dir'))
else:
state_dir = '/var/lib/zuul'
d = os.path.join(state_dir, 'times')
if not os.path.exists(d):
os.mkdir(d)
return d
def _save_queue(self):
pickle_file = self._get_queue_pickle_file()
events = []
@ -687,8 +703,8 @@ class Scheduler(threading.Thread):
else:
self.log.error("Unable to handle event %s" % event)
event.done()
except Exception as e:
event.exception(e, sys.exc_info()[2])
except Exception:
event.exception(sys.exc_info())
self.management_event_queue.task_done()
def process_result_queue(self):
@ -718,6 +734,11 @@ class Scheduler(threading.Thread):
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
try:
build.estimated_time = float(self.time_database.getEstimatedTime(
build.job.name))
except Exception:
self.log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(event.build)
def _doBuildCompletedEvent(self, event):
@ -731,6 +752,13 @@ class Scheduler(threading.Thread):
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
return
if build.end_time and build.start_time and build.result:
duration = build.end_time - build.start_time
try:
self.time_database.update(
build.job.name, duration, build.result)
except Exception:
self.log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):
@ -747,6 +775,11 @@ class Scheduler(threading.Thread):
def formatStatusJSON(self):
# TODOv3(jeblair): use tenants
if self.config.has_option('zuul', 'url_pattern'):
url_pattern = self.config.get('zuul', 'url_pattern')
else:
url_pattern = None
data = {}
data['zuul_version'] = self.zuul_version
@ -772,5 +805,5 @@ class Scheduler(threading.Thread):
pipelines = []
data['pipelines'] = pipelines
for pipeline in self.layout.pipelines.values():
pipelines.append(pipeline.formatStatusJSON())
pipelines.append(pipeline.formatStatusJSON(url_pattern))
return json.dumps(data)

View File

@ -40,8 +40,8 @@ class TimerTrigger(BaseTrigger):
self.log.debug("Adding event %s" % event)
self.connection.sched.addEvent(event)
def _shutdown(self):
self.apsched.stop()
def stop(self):
self.apsched.shutdown()
def getEventFilters(self, trigger_conf):
def toList(item):

View File

@ -43,16 +43,19 @@ array of changes, they will not include the queue structure.
class WebApp(threading.Thread):
log = logging.getLogger("zuul.WebApp")
def __init__(self, scheduler, port=8001, cache_expiry=1):
def __init__(self, scheduler, port=8001, cache_expiry=1,
listen_address='0.0.0.0'):
threading.Thread.__init__(self)
self.scheduler = scheduler
self.listen_address = listen_address
self.port = port
self.cache_expiry = cache_expiry
self.cache_time = 0
self.cache = None
self.daemon = True
self.server = httpserver.serve(dec.wsgify(self.app), host='0.0.0.0',
port=self.port, start_loop=False)
self.server = httpserver.serve(
dec.wsgify(self.app), host=self.listen_address, port=self.port,
start_loop=False)
def run(self):
self.server.serve_forever()