mqtt: add basic reporter

This change adds a MQTT reporter to publish build results message.

Change-Id: I5a9937a7952beac5c77d83ab791d48ff000b447b
This commit is contained in:
Tristan Cacqueray 2017-11-07 09:05:20 +00:00
parent 35c2059e19
commit 531a880a5d
17 changed files with 636 additions and 0 deletions

View File

@ -65,6 +65,7 @@ Zuul includes the following drivers:
drivers/gerrit
drivers/github
drivers/git
drivers/mqtt
drivers/smtp
drivers/sql
drivers/timer

View File

@ -0,0 +1,251 @@
:title: MQTT Driver
MQTT
====
The MQTT driver supports reporters only. It is used to send MQTT
message when items report.
Message Schema
--------------
An MQTT report uses this schema:
.. attr:: <mqtt schema>
.. attr:: action
The reporter action name, e.g.: 'start', 'success', 'failure',
'merge-failure', ...
.. attr:: tenant
The tenant name.
.. attr:: pipeline
The pipeline name.
.. attr:: project
The project name.
.. attr:: branch
The branch name.
.. attr:: change_url
The change url.
.. attr:: message
The report message.
.. attr:: change
The change number.
.. attr:: patchset
The patchset number.
.. attr:: ref
The change reference.
.. attr:: zuul_ref
The internal zuul change reference.
.. attr:: buildset
The buildset information.
.. value:: uuid
The buildset global uuid.
.. attr:: builds
The list of builds.
.. attr:: job_name
The job name.
.. attr:: voting
The job voting status.
.. attr:: uuid
The build uuid (not present in start report).
.. attr:: start_time
The build start time (not present in start report).
.. attr:: end_time
The build end time (not present in start report).
.. attr:: log_url
The build log url (not present in start report).
.. attr:: result
The build results (not present in start report).
Here is an example of a start message:
.. code-block:: javascript
{
'action': 'start',
'tenant': 'openstack.org',
'pipeline': 'check',
'project': 'sf-jobs',
'branch': 'master',
'change_url': 'https://gerrit.example.com/r/3',
'message': 'Starting check jobs.',
'change': '3',
'patchset': '1',
"ref": "refs/changes/03/3/1",
'zuul_ref': 'Zf8b3d7cd34f54cb396b488226589db8f'
'buildset': {
'uuid': 'f8b3d7cd34f54cb396b488226589db8f'
'builds': [{
'job_name': 'linters',
'voting': True
}],
},
}
Here is an example of a success message:
.. code-block:: javascript
{
'action': 'success',
'tenant': 'openstack.org',
'pipeline': 'check',
'project': 'sf-jobs',
'branch': 'master',
'change_url': 'https://gerrit.example.com/r/3',
'message': 'Build succeeded.',
'change': '3',
'patchset': '1',
"ref": "refs/changes/03/3/1",
'zuul_ref': 'Zf8b3d7cd34f54cb396b488226589db8f'
'buildset': {
'uuid': 'f8b3d7cd34f54cb396b488226589db8f'
'builds': [{
'job_name': 'linters',
'voting': True
'uuid': '16e3e55aca984c6c9a50cc3c5b21bb83',
'start_time': 1524801179.8557224,
'end_time': 1524801208.928095,
'log_url': 'https://logs.example.com/logs/3/3/1/check/linters/16e3e55/',
'result': 'SUCCESS',
}],
},
}
Connection Configuration
------------------------
.. attr:: <mqtt connection>
.. attr:: driver
:required:
.. value:: mqtt
The connection must set ``driver=mqtt`` for MQTT connections.
.. attr:: server
:default: localhost
MQTT server hostname or address to use.
.. attr:: port
:default: 1883
MQTT server port.
.. attr:: keepalive
:default: 60
Maximum period in seconds allowed between communications with the broker.
.. attr:: user
Set a username for optional broker authentication.
.. attr:: password
Set a password for optional broker authentication.
.. attr:: ca_certs
A string path to the Certificate Authority certificate files to enable
TLS connection.
.. attr:: certfile
A strings pointing to the PEM encoded client certificate to
enable client TLS based authentication. This option requires keyfile to
be set too.
.. attr:: keyfile
A strings pointing to the PEM encoded client private keys to
enable client TLS based authentication. This option requires certfile to
be set too.
.. attr:: ciphers
A string specifying which encryption ciphers are allowable for this
connection. More information in this
`openssl doc <https://www.openssl.org/docs/manmaster/man1/ciphers.html>`_.
Reporter Configuration
----------------------
A :ref:`connection<connections>` that uses the mqtt driver must be supplied to the
reporter. Each pipeline must provide a topic name. For example:
.. code-block:: yaml
- pipeline:
name: check
success:
mqtt:
topic: "{tenant}/zuul/{pipeline}/{project}/{branch}/{change}"
qos: 2
.. attr:: pipeline.<reporter>.<mqtt>
To report via MQTT message, the dictionaries passed to any of the pipeline
:ref:`reporter<reporters>` support the following attributes:
.. attr:: topic
The MQTT topic to publish messages. The topic can be a format string that
can use the following parameters: ``tenant``, ``pipeline``, ``project``,
``branch``, ``change``, ``patchset`` and ``ref``.
MQTT topic can have hierarchy separated by ``/``, more details in this
`doc <https://mosquitto.org/man/mqtt-7.html>`_
.. attr:: qos
:default: 0
The quality of service level to use, it can be 0, 1 or 2. Read more in this
`guide <https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels>`_

View File

@ -56,6 +56,13 @@ port=25
default_from=zuul@example.com
default_to=you@example.com
[connection mqtt]
driver=mqtt
server=localhost
user=zuul
password=zuul
;keepalive=60
[connection mydatabase]
driver=sql
dburi=mysql+pymysql://user@localhost/zuul

View File

@ -0,0 +1,3 @@
---
features:
- A :attr:<mqtt connection> driver is added to feature build report over MQTT message.

View File

@ -25,3 +25,4 @@ aiohttp<3.0.0
uvloop;python_version>='3.5'
psutil
fb-re2>=1.0.6
paho-mqtt

View File

@ -2318,6 +2318,17 @@ class ZuulTestCase(BaseTestCase):
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
# Set up mqtt related fakes
self.mqtt_messages = []
def fakeMQTTPublish(_, topic, msg, qos):
log = logging.getLogger('zuul.FakeMQTTPubish')
log.info('Publishing message via mqtt')
self.mqtt_messages.append({'topic': topic, 'msg': msg, 'qos': qos})
self.useFixture(fixtures.MonkeyPatch(
'zuul.driver.mqtt.mqttconnection.MQTTConnection.publish',
fakeMQTTPublish))
# Register connections from the config using fakes
self.connections = zuul.lib.connections.ConnectionRegistry()
self.connections.configure(self.config, source_only=source_only)

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,38 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
start:
mqtt:
topic: "{tenant}/zuul_start/{pipeline}/{project}/{branch}"
success:
gerrit:
Verified: 1
mqtt:
topic: "{tenant}/zuul_buildset/{pipeline}/{project}/{branch}"
failure:
gerrit:
Verified: -1
mqtt:
topic: "{tenant}/zuul_buildset/{pipeline}/{project}/{branch}"
- job:
name: base
parent: null
- job:
name: test
run: playbooks/test.yaml
- project:
name: org/project
check:
jobs:
- test
- project:
name: common-config
check:
jobs: []

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,8 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project

22
tests/fixtures/zuul-mqtt-driver.conf vendored Normal file
View File

@ -0,0 +1,22 @@
[gearman]
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
[executor]
git_dir=/tmp/zuul-test/executor-git
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa1
[connection mqtt]
driver=mqtt

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import textwrap
import sqlalchemy as sa
from tests.base import ZuulTestCase, ZuulDBTestCase
@ -433,3 +435,59 @@ class TestConnectionsGitweb(ZuulTestCase):
url_should_be = 'https://review.example.com/' \
'gitweb?p=foo/bar.git;a=commitdiff;h=1'
self.assertEqual(url, url_should_be)
class TestMQTTConnection(ZuulTestCase):
config_file = 'zuul-mqtt-driver.conf'
tenant_config_file = 'config/mqtt-driver/main.yaml'
def test_mqtt_reporter(self):
"Test the MQTT reporter"
# Add a success result
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
success_event = self.mqtt_messages.pop()
start_event = self.mqtt_messages.pop()
self.assertEquals(start_event.get('topic'),
'tenant-one/zuul_start/check/org/project/master')
mqtt_payload = start_event['msg']
self.assertEquals(mqtt_payload['project'], 'org/project')
self.assertEquals(mqtt_payload['branch'], 'master')
self.assertEquals(mqtt_payload['buildset']['builds'][0]['job_name'],
'test')
self.assertNotIn('result', mqtt_payload['buildset']['builds'][0])
self.assertEquals(success_event.get('topic'),
'tenant-one/zuul_buildset/check/org/project/master')
mqtt_payload = success_event['msg']
self.assertEquals(mqtt_payload['project'], 'org/project')
self.assertEquals(mqtt_payload['branch'], 'master')
self.assertEquals(mqtt_payload['buildset']['builds'][0]['job_name'],
'test')
self.assertEquals(mqtt_payload['buildset']['builds'][0]['result'],
'SUCCESS')
def test_mqtt_invalid_topic(self):
in_repo_conf = textwrap.dedent(
"""
- pipeline:
name: test-pipeline
manager: independent
trigger:
gerrit:
- event: comment-added
start:
mqtt:
topic: "{bad}/{topic}"
""")
file_dict = {'zuul.d/test.yaml': in_repo_conf}
A = self.fake_gerrit.addFakeChange('common-config', 'master', 'A',
files=file_dict)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertIn("topic component 'bad' is invalid", A.messages[0],
"A should report a syntax error")

View File

@ -0,0 +1,30 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zuul.driver import Driver, ConnectionInterface, ReporterInterface
from zuul.driver.mqtt import mqttconnection
from zuul.driver.mqtt import mqttreporter
class MQTTDriver(Driver, ConnectionInterface, ReporterInterface):
name = 'mqtt'
def getConnection(self, name, config):
return mqttconnection.MQTTConnection(self, name, config)
def getReporter(self, connection, config=None):
return mqttreporter.MQTTReporter(self, connection, config)
def getReporterSchema(self):
return mqttreporter.getSchema()

View File

@ -0,0 +1,88 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import json
import voluptuous as v
import paho.mqtt.client as mqtt
from zuul.connection import BaseConnection
from zuul.exceptions import ConfigurationError
class MQTTConnection(BaseConnection):
driver_name = 'mqtt'
log = logging.getLogger("zuul.MQTTConnection")
def __init__(self, driver, connection_name, connection_config):
super(MQTTConnection, self).__init__(driver, connection_name,
connection_config)
self.client = mqtt.Client(
client_id=self.connection_config.get('client_id'))
if self.connection_config.get('user'):
self.client.username_pw_set(
self.connection_config.get('user'),
self.connection_config.get('password'))
ca_certs = self.connection_config.get('ca_certs')
certfile = self.connection_config.get('certfile')
keyfile = self.connection_config.get('keyfile')
ciphers = self.connection_config.get('ciphers')
if (ciphers or certfile or keyfile) and not ca_certs:
raise ConfigurationError(
"MQTT TLS configuration requires the ca_certs option")
if ca_certs:
if bool(certfile) != bool(keyfile):
raise ConfigurationError(
"MQTT configuration keyfile and certfile "
"options must both be set.")
self.client.tls_set(
ca_certs,
certfile=certfile,
keyfile=keyfile,
ciphers=ciphers)
self.connected = False
def onLoad(self):
self.log.debug("Starting MQTT Connection")
try:
self.client.connect(
self.connection_config.get('server', 'localhost'),
port=int(self.connection_config.get('port', 1883)),
keepalive=int(self.connection_config.get('keepalive', 60))
)
self.connected = True
except Exception:
self.log.exception("MQTT reporter (%s) couldn't connect" % self)
self.client.loop_start()
def onStop(self):
self.log.debug("Stopping MQTT Connection")
self.client.loop_stop()
self.client.disconnect()
self.connected = False
def publish(self, topic, message, qos):
if not self.connected:
self.log.warn("MQTT reporter (%s) is disabled" % self)
return
try:
self.client.publish(topic, payload=json.dumps(message), qos=qos)
except Exception:
self.log.exception(
"Could not publish message to topic '%s' via mqtt", topic)
def getSchema():
return v.Any(str, v.Schema(dict))

View File

@ -0,0 +1,109 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import voluptuous as v
from zuul.reporter import BaseReporter
class MQTTReporter(BaseReporter):
"""Publish messages to a topic via mqtt"""
name = 'mqtt'
log = logging.getLogger("zuul.MQTTReporter")
def report(self, item):
self.log.debug("Report change %s, params %s" %
(item.change, self.config))
message = {
'action': self._action,
'tenant': item.pipeline.layout.tenant.name,
'zuul_ref': item.current_build_set.ref,
'pipeline': item.pipeline.name,
'project': item.change.project.name,
'branch': getattr(item.change, 'branch', ''),
'change_url': item.change.url,
'change': getattr(item.change, 'number', ''),
'patchset': getattr(item.change, 'patchset', ''),
'ref': getattr(item.change, 'ref', ''),
'message': self._formatItemReport(
item, with_jobs=False),
'buildset': {
'uuid': item.current_build_set.uuid,
'builds': []
},
}
for job in item.getJobs():
job_informations = {
'job_name': job.name,
'voting': job.voting,
}
build = item.current_build_set.getBuild(job.name)
if build:
# Report build data if available
(result, url) = item.formatJobResult(job)
job_informations.update({
'uuid': build.uuid,
'start_time': build.start_time,
'end_time': build.end_time,
'log_url': url,
'result': result,
})
message['buildset']['builds'].append(job_informations)
topic = None
try:
topic = self.config['topic'].format(
tenant=item.pipeline.layout.tenant.name,
pipeline=item.pipeline.name,
project=item.change.project.name,
branch=getattr(item.change, 'branch', None),
change=getattr(item.change, 'number', None),
patchset=getattr(item.change, 'patchset', None),
ref=getattr(item.change, 'ref', None))
except Exception:
self.log.exception("Error while formatting MQTT topic %s:"
% self.config['topic'])
if topic is not None:
self.connection.publish(
topic, message, qos=self.config.get('qos', 0))
def topicValue(value):
if not isinstance(value, str):
raise v.Invalid("topic is not a string")
try:
value.format(
tenant='test',
pipeline='test',
project='test',
branch='test',
change='test',
patchset='test',
ref='test')
except KeyError as e:
raise v.Invalid("topic component %s is invalid" % str(e))
return value
def qosValue(value):
if not isinstance(value, int):
raise v.Invalid("qos is not a integer")
if value not in (0, 1, 2):
raise v.Invalid("qos can only be 0, 1 or 2")
return value
def getSchema():
return v.Schema({v.Required('topic'): topicValue, 'qos': qosValue})

View File

@ -33,3 +33,7 @@ class RevNotFound(Exception):
class MergeFailure(Exception):
pass
class ConfigurationError(Exception):
pass

View File

@ -25,6 +25,7 @@ import zuul.driver.timer
import zuul.driver.sql
import zuul.driver.bubblewrap
import zuul.driver.nullwrap
import zuul.driver.mqtt
from zuul.connection import BaseConnection
from zuul.driver import SourceInterface
@ -51,6 +52,7 @@ class ConnectionRegistry(object):
self.registerDriver(zuul.driver.sql.SQLDriver())
self.registerDriver(zuul.driver.bubblewrap.BubblewrapDriver())
self.registerDriver(zuul.driver.nullwrap.NullwrapDriver())
self.registerDriver(zuul.driver.mqtt.MQTTDriver())
def registerDriver(self, driver):
if driver.name in self.drivers: