Improve event logging in githubconnection

This moves event processing to its own class, so that it's
easier to bundle all of the data related to an event along with
an event-specific logger.

This logs the delivery ID for every line when we're preparing the
event.  It also logs the start time and queue length as well as the
end time, even on error.

Change-Id: I941a74ecbdb418cf94537ca9f8f1917a5e38dd33
This commit is contained in:
James E. Blair 2019-02-12 11:32:11 -08:00 committed by Tobias Henkel
parent 6d6c69f93e
commit d540ebfe32
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
1 changed files with 104 additions and 68 deletions

View File

@ -171,30 +171,37 @@ class GithubGearmanWorker(object):
self.gearman.shutdown()
class GithubEventConnector(threading.Thread):
"""Move events from GitHub into the scheduler"""
class GithubEventLogAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
msg, kwargs = super(GithubEventLogAdapter, self).process(msg, kwargs)
msg = '[delivery: %s] %s' % (kwargs['extra']['delivery'], msg)
return msg, kwargs
log = logging.getLogger("zuul.GithubEventConnector")
def __init__(self, connection):
super(GithubEventConnector, self).__init__()
self.daemon = True
self.connection = connection
self._stopped = False
class GithubEventProcessor(object):
def __init__(self, connector, event_tuple):
self.connector = connector
self.connection = connector.connection
self.ts, self.body, self.event_type, self.delivery = event_tuple
logger = logging.getLogger("zuul.GithubEventConnector")
self.log = GithubEventLogAdapter(logger, {'delivery': self.delivery})
def stop(self):
self._stopped = True
self.connection.addEvent(None)
def run(self):
self.log.debug("Starting event processing, queue length %s",
self.connection.getEventQueueSize())
try:
self._handle_event()
finally:
self.log.debug("Finished event processing")
def _handleEvent(self):
ts, json_body, event_type, delivery = self.connection.getEvent()
if self._stopped:
def _handle_event(self):
if self.connector._stopped:
return
# If there's any installation mapping information in the body then
# update the project mapping before any requests are made.
installation_id = json_body.get('installation', {}).get('id')
project_name = json_body.get('repository', {}).get('full_name')
installation_id = self.body.get('installation', {}).get('id')
project_name = self.body.get('repository', {}).get('full_name')
if installation_id and project_name:
old_id = self.connection.installation_map.get(project_name)
@ -206,29 +213,33 @@ class GithubEventConnector(threading.Thread):
self.connection.installation_map[project_name] = installation_id
try:
method = getattr(self, '_event_' + event_type)
method = getattr(self, '_event_' + self.event_type)
except AttributeError:
# TODO(jlk): Gracefully handle event types we don't care about
# instead of logging an exception.
message = "Unhandled X-Github-Event: {0}".format(event_type)
message = "Unhandled X-Github-Event: {0}".format(self.event_type)
self.log.debug(message)
# Returns empty on unhandled events
return
self.log.debug("Handling %s event", self.event_type)
try:
event = method(json_body)
event = method()
except Exception:
self.log.exception('Exception when handling event:')
event = None
if event:
event.delivery = delivery
event.delivery = self.delivery
project = self.connection.source.getProject(event.project_name)
if event.change_number:
self.connection._getChange(project,
event.change_number,
event.patch_number,
refresh=True)
self.log.debug("Refreshed change %s,%s",
event.change_number, event.patch_number)
# If this event references a branch and we're excluding unprotected
# branches, we might need to check whether the branch is now
@ -237,8 +248,8 @@ class GithubEventConnector(threading.Thread):
b = self.connection.getBranch(project.name, event.branch)
if b is not None:
branch_protected = b.get('protected')
self.connection.checkBranchCache(project, event.branch,
branch_protected)
self.connection.checkBranchCache(
project, event.branch, branch_protected, self.log)
event.branch_protected = branch_protected
else:
# This can happen if the branch was deleted in GitHub. In
@ -251,8 +262,8 @@ class GithubEventConnector(threading.Thread):
self.connection.logEvent(event)
self.connection.sched.addEvent(event)
def _event_push(self, body):
base_repo = body.get('repository')
def _event_push(self):
base_repo = self.body.get('repository')
event = GithubTriggerEvent()
event.trigger_name = 'github'
@ -260,10 +271,10 @@ class GithubEventConnector(threading.Thread):
event.type = 'push'
event.branch_updated = True
event.ref = body.get('ref')
event.oldrev = body.get('before')
event.newrev = body.get('after')
event.commits = body.get('commits')
event.ref = self.body.get('ref')
event.oldrev = self.body.get('before')
event.newrev = self.body.get('after')
event.commits = self.body.get('commits')
ref_parts = event.ref.split('/', 2) # ie, ['refs', 'heads', 'foo/bar']
@ -289,20 +300,20 @@ class GithubEventConnector(threading.Thread):
# know if branch protection has been disabled before deletion
# of the branch.
# FIXME(tobiash): Find a way to handle that case
self.connection._clearBranchCache(project)
self.connection._clearBranchCache(project, self.log)
elif event.branch_created:
# A new branch never can be protected because that needs to be
# configured after it has been created.
self.connection._clearBranchCache(project)
self.connection._clearBranchCache(project, self.log)
return event
def _event_pull_request(self, body):
action = body.get('action')
pr_body = body.get('pull_request')
def _event_pull_request(self):
action = self.body.get('action')
pr_body = self.body.get('pull_request')
event = self._pull_request_to_event(pr_body)
event.account = self._get_sender(body)
event.account = self._get_sender(self.body)
event.type = 'pull_request'
if action == 'opened':
@ -315,10 +326,10 @@ class GithubEventConnector(threading.Thread):
event.action = 'reopened'
elif action == 'labeled':
event.action = 'labeled'
event.label = body['label']['name']
event.label = self.body['label']['name']
elif action == 'unlabeled':
event.action = 'unlabeled'
event.label = body['label']['name']
event.label = self.body['label']['name']
elif action == 'edited':
event.action = 'edited'
else:
@ -326,66 +337,67 @@ class GithubEventConnector(threading.Thread):
return event
def _event_issue_comment(self, body):
def _event_issue_comment(self):
"""Handles pull request comments"""
action = body.get('action')
action = self.body.get('action')
if action != 'created':
return
if not body.get('issue', {}).get('pull_request'):
if not self.body.get('issue', {}).get('pull_request'):
# Do not process non-PR issue comment
return
pr_body = self._issue_to_pull_request(body)
pr_body = self._issue_to_pull_request(self.body)
if pr_body is None:
return
event = self._pull_request_to_event(pr_body)
event.account = self._get_sender(body)
event.comment = body.get('comment').get('body')
event.account = self._get_sender(self.body)
event.comment = self.body.get('comment').get('body')
event.type = 'pull_request'
event.action = 'comment'
return event
def _event_pull_request_review(self, body):
def _event_pull_request_review(self):
"""Handles pull request reviews"""
pr_body = body.get('pull_request')
pr_body = self.body.get('pull_request')
if pr_body is None:
return
review = body.get('review')
review = self.body.get('review')
if review is None:
return
event = self._pull_request_to_event(pr_body)
event.state = review.get('state')
event.account = self._get_sender(body)
event.account = self._get_sender(self.body)
event.type = 'pull_request_review'
event.action = body.get('action')
event.action = self.body.get('action')
return event
def _event_status(self, body):
action = body.get('action')
def _event_status(self):
action = self.body.get('action')
if action == 'pending':
return
project = body.get('name')
pr_body = self.connection.getPullBySha(body['sha'], project)
project = self.body.get('name')
pr_body = self.connection.getPullBySha(
self.body['sha'], project, self.log)
if pr_body is None:
return
event = self._pull_request_to_event(pr_body)
event.account = self._get_sender(body)
event.account = self._get_sender(self.body)
event.type = 'pull_request'
event.action = 'status'
# Github API is silly. Webhook blob sets author data in
# 'sender', but API call to get status puts it in 'creator'.
# Duplicate the data so our code can look in one place
body['creator'] = body['sender']
event.status = "%s:%s:%s" % _status_as_tuple(body)
self.body['creator'] = self.body['sender']
event.status = "%s:%s:%s" % _status_as_tuple(self.body)
return event
def _issue_to_pull_request(self, body):
number = body.get('issue').get('number')
project_name = body.get('repository').get('full_name')
pr_body = self.connection.getPull(project_name, number)
pr_body = self.connection.getPull(project_name, number, self.log)
if pr_body is None:
self.log.debug('Pull request #%s not found in project %s' %
(number, project_name))
@ -417,14 +429,33 @@ class GithubEventConnector(threading.Thread):
if login:
# TODO(tobiash): it might be better to plumb in the installation id
project = body.get('repository', {}).get('full_name')
return self.connection.getUser(login, project)
user = self.connection.getUser(login, project)
self.log.debug("Got user %s", user)
return user
class GithubEventConnector(threading.Thread):
"""Move events from GitHub into the scheduler"""
log = logging.getLogger("zuul.GithubEventConnector")
def __init__(self, connection):
super(GithubEventConnector, self).__init__()
self.daemon = True
self.connection = connection
self._stopped = False
def stop(self):
self._stopped = True
self.connection.addEvent(None)
def run(self):
while True:
if self._stopped:
return
try:
self._handleEvent()
data = self.connection.getEvent()
GithubEventProcessor(self, data).run()
except Exception:
self.log.exception("Exception moving GitHub event:")
finally:
@ -754,6 +785,9 @@ class GithubConnection(BaseConnection):
def getEvent(self):
return self.event_queue.get()
def getEventQueueSize(self):
return self.event_queue.qsize()
def eventDone(self):
self.event_queue.task_done()
@ -1065,7 +1099,9 @@ class GithubConnection(BaseConnection):
def getPullUrl(self, project, number):
return '%s/pull/%s' % (self.getGitwebUrl(project), number)
def getPull(self, project_name, number):
def getPull(self, project_name, number, log=None):
if log is None:
log = self.log
github = self.getGithubClient(project_name)
owner, proj = project_name.split('/')
for retry in range(5):
@ -1089,7 +1125,7 @@ class GithubConnection(BaseConnection):
pr['files'] = []
pr['labels'] = [l.name for l in issueobj.labels()]
self.log.debug('Got PR %s#%s', project_name, number)
log.debug('Got PR %s#%s', project_name, number)
self.log_rate_limit(self.log, github)
return pr
@ -1125,7 +1161,7 @@ class GithubConnection(BaseConnection):
return True
def getPullBySha(self, sha, project):
def getPullBySha(self, sha, project, log):
pulls = []
owner, project = project.split('/')
github = self.getGithubClient("%s/%s" % (owner, project))
@ -1137,7 +1173,7 @@ class GithubConnection(BaseConnection):
continue
pulls.append(pr.as_dict())
self.log.debug('Got PR on project %s for sha %s', project, sha)
log.debug('Got PR on project %s for sha %s', project, sha)
self.log_rate_limit(self.log, github)
if len(pulls) > 1:
raise Exception('Multiple pulls found with head sha %s' % sha)
@ -1433,8 +1469,8 @@ class GithubConnection(BaseConnection):
log.debug('GitHub API rate limit remaining: %s reset: %s',
remaining, reset)
def _clearBranchCache(self, project):
self.log.debug("Clearing branch cache for %s", project.name)
def _clearBranchCache(self, project, log):
log.debug("Clearing branch cache for %s", project.name)
for cache in [
self._project_branch_cache_exclude_unprotected,
self._project_branch_cache_include_unprotected,
@ -1444,7 +1480,7 @@ class GithubConnection(BaseConnection):
except KeyError:
pass
def checkBranchCache(self, project, branch, protected):
def checkBranchCache(self, project, branch, protected, log):
# If the branch appears in the exclude_unprotected cache but
# is unprotected, clear the exclude cache.
@ -1457,16 +1493,16 @@ class GithubConnection(BaseConnection):
cache = self._project_branch_cache_exclude_unprotected
branches = cache.get(project.name, [])
if (branch in branches) and (not protected):
self.log.debug("Clearing protected branch cache for %s",
project.name)
log.debug("Clearing protected branch cache for %s",
project.name)
try:
del cache[project.name]
except KeyError:
pass
return
if (branch not in branches) and (protected):
self.log.debug("Clearing protected branch cache for %s",
project.name)
log.debug("Clearing protected branch cache for %s",
project.name)
try:
del cache[project.name]
except KeyError: