Delay Github fileschanges workaround to pipeline processing

Github pull requests files API only returns at max the first 300
changed files of a PR in alphabetical order. Change
I10a593e26ac85b8c12ca9c82051cad809382f50a introduced a workaround that
queries the file list from the mergers within the github event
loop. While this was a minimal invasive approach this can cause
multi-minute delays in the github event queue.

This can be fixed by making this query asynchronous and delaying it to
the pipeline processing. This query is now handled the same way as
merge requests.

Change-Id: I9c77b35f0da4d892efc420370c04bcc7070c7676
Depends-On: https://review.openstack.org/625596
This commit is contained in:
Tobias Henkel 2018-12-17 13:41:47 +01:00
parent c933eac2ed
commit 8bfc0cd409
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
7 changed files with 126 additions and 32 deletions

View File

@ -1001,7 +1001,6 @@ class FakeGithubPullRequest(object):
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
f = open(fn, 'w')
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])

View File

@ -119,6 +119,42 @@ class TestGithubDriver(ZuulTestCase):
self.waitUntilSettled()
self.assertEqual(1, len(self.history))
@simple_layout('layouts/files-github.yaml', driver='github')
def test_pull_changed_files_length_mismatch_reenqueue(self):
# Hold jobs so we can trigger a reconfiguration while the item is in
# the pipeline
self.executor_server.hold_jobs_in_build = True
files = {'{:03d}.txt'.format(n): 'test' for n in range(300)}
# File 301 which is not included in the list of files of the PR,
# since Github only returns max. 300 files in alphabetical order
files["foobar-requires"] = "test"
A = self.fake_github.openFakePullRequest(
'org/project', 'master', 'A', files=files)
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
# Comment on the pull request to trigger updateChange
self.fake_github.emitEvent(A.getCommentAddedEvent('casual comment'))
self.waitUntilSettled()
# Trigger reconfig to enforce a reenqueue of the item
self.sched.reconfigure(self.config)
self.waitUntilSettled()
# Now we can release all jobs
self.executor_server.hold_jobs_in_build = True
self.executor_server.release()
self.waitUntilSettled()
# There must be exactly one successful job in the history. If there is
# an aborted job in the history the reenqueue failed.
self.assertHistory([
dict(name='project-test1', result='SUCCESS',
changes="%s,%s" % (A.number, A.head_sha)),
])
@simple_layout('layouts/basic-github.yaml', driver='github')
def test_pull_github_files_error(self):
A = self.fake_github.openFakePullRequest(

View File

@ -904,24 +904,17 @@ class GithubConnection(BaseConnection):
return changes
def getFilesChanges(self, project_name, head, base):
job = self.sched.merger.getFilesChanges(self.connection_name,
project_name,
head, base)
self.log.debug("Waiting for fileschanges job %s", job)
job.wait()
if not job.updated:
raise Exception("Fileschanges job {} failed".format(job))
self.log.debug("Fileschanges job %s got changes on files %s",
job, job.files)
return job.files
def _updateChange(self, change):
self.log.info("Updating %s" % (change,))
change.pr = self.getPull(change.project.name, change.number)
change.ref = "refs/pull/%s/head" % change.number
change.branch = change.pr.get('base').get('ref')
change.files = change.pr.get('files')
# Don't overwrite the files list. The change object is bound to a
# specific revision and thus the changed files won't change. This is
# important if we got the files later because of the 300 files limit.
if not change.files:
change.files = change.pr.get('files')
# Github's pull requests files API only returns at max
# the first 300 changed files of a PR in alphabetical order.
# https://developer.github.com/v3/pulls/#list-pull-requests-files
@ -929,10 +922,11 @@ class GithubConnection(BaseConnection):
self.log.warning("Got only %s files but PR has %s files.",
len(change.files),
change.pr.get('changed_files', 0))
change.files = self.getFilesChanges(
change.project.name,
change.ref,
change.branch)
# In this case explicitly set change.files to None to signalize
# that we need to ask the mergers later in pipeline processing.
# We cannot query the files here using the mergers because this
# can slow down the github event queue considerably.
change.files = None
change.title = change.pr.get('title')
change.open = change.pr.get('state') == 'open'
change.is_merged = change.pr.get('merged')

View File

@ -578,8 +578,6 @@ class PipelineManager(object):
return self._loadDynamicLayout(item)
def scheduleMerge(self, item, files=None, dirs=None):
build_set = item.current_build_set
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
(item, files, dirs))
build_set = item.current_build_set
@ -594,23 +592,38 @@ class PipelineManager(object):
precedence=self.pipeline.precedence)
return False
def scheduleFilesChanges(self, item):
self.log.debug("Scheduling fileschanged for item %s", item)
build_set = item.current_build_set
build_set.files_state = build_set.PENDING
self.sched.merger.getFilesChanges(
item.change.project.connection_name, item.change.project.name,
item.change.ref, item.change.branch, build_set=build_set)
return False
def prepareItem(self, item):
# This runs on every iteration of _processOneItem
# Returns True if the item is ready, false otherwise
ready = True
build_set = item.current_build_set
if not build_set.ref:
build_set.setConfiguration()
if build_set.merge_state == build_set.NEW:
return self.scheduleMerge(item,
files=['zuul.yaml', '.zuul.yaml'],
dirs=['zuul.d', '.zuul.d'])
ready = self.scheduleMerge(item,
files=['zuul.yaml', '.zuul.yaml'],
dirs=['zuul.d', '.zuul.d'])
if build_set.files_state == build_set.NEW:
ready = self.scheduleFilesChanges(item)
if build_set.files_state == build_set.PENDING:
ready = False
if build_set.merge_state == build_set.PENDING:
return False
ready = False
if build_set.unable_to_merge:
return False
ready = False
if build_set.config_errors:
return False
return True
ready = False
return ready
def prepareJobs(self, item):
# This only runs once the item is in the pipeline's action window
@ -820,6 +833,12 @@ class PipelineManager(object):
self._resumeBuilds(build.build_set)
return True
def onFilesChangesCompleted(self, event):
build_set = event.build_set
item = build_set.item
item.change.files = event.files
build_set.files_state = build_set.COMPLETE
def onMergeCompleted(self, event):
build_set = event.build_set
item = build_set.item

View File

@ -132,12 +132,14 @@ class MergeClient(object):
return job
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH):
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH,
build_set=None):
data = dict(connection=connection_name,
project=project_name,
branch=branch,
tosha=tosha)
job = self.submitJob('merger:fileschanges', data, None, precedence)
job = self.submitJob('merger:fileschanges', data, build_set,
precedence)
return job
def onBuildCompleted(self, job):
@ -153,9 +155,13 @@ class MergeClient(object):
(job, merged, job.updated, commit))
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set,
merged, job.updated, commit, files,
repo_state)
if job.name == 'merger:fileschanges':
self.sched.onFilesChangesCompleted(job.build_set, files)
else:
self.sched.onMergeCompleted(job.build_set,
merged, job.updated, commit, files,
repo_state)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)

View File

@ -1784,6 +1784,10 @@ class BuildSet(object):
self.files = RepoFiles()
self.repo_state = {}
self.tries = {}
if item.change.files is not None:
self.files_state = self.COMPLETE
else:
self.files_state = self.NEW
@property
def ref(self):
@ -2582,6 +2586,11 @@ class Ref(object):
return set()
def updatesConfig(self):
if self.files is None:
# If self.files is None we don't know if this change updates the
# config so assume it does as this is a safe default if we don't
# know.
return True
if 'zuul.yaml' in self.files or '.zuul.yaml' in self.files or \
[True for fn in self.files if fn.startswith("zuul.d/") or
fn.startswith(".zuul.d/")]:

View File

@ -215,6 +215,18 @@ class MergeCompletedEvent(ResultEvent):
self.repo_state = repo_state
class FilesChangesCompletedEvent(ResultEvent):
"""A remote fileschanges operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg list files: List of files changed.
"""
def __init__(self, build_set, files):
self.build_set = build_set
self.files = files
class NodesProvisionedEvent(ResultEvent):
"""Nodes have been provisioned for a build_set
@ -475,6 +487,11 @@ class Scheduler(threading.Thread):
self.result_event_queue.put(event)
self.wake_event.set()
def onFilesChangesCompleted(self, build_set, files):
event = FilesChangesCompletedEvent(build_set, files)
self.result_event_queue.put(event)
self.wake_event.set()
def onNodesProvisioned(self, req):
event = NodesProvisionedEvent(req)
self.result_event_queue.put(event)
@ -1107,6 +1124,8 @@ class Scheduler(threading.Thread):
self._doBuildCompletedEvent(event)
elif isinstance(event, MergeCompletedEvent):
self._doMergeCompletedEvent(event)
elif isinstance(event, FilesChangesCompletedEvent):
self._doFilesChangesCompletedEvent(event)
elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event)
else:
@ -1264,6 +1283,18 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onMergeCompleted(event)
def _doFilesChangesCompletedEvent(self, event):
build_set = event.build_set
if build_set is not build_set.item.current_build_set:
self.log.warning("Build set %s is not current", build_set)
return
pipeline = build_set.item.pipeline
if not pipeline:
self.log.warning("Build set %s is not associated with a pipeline",
build_set)
return
pipeline.manager.onFilesChangesCompleted(event)
def _doNodesProvisionedEvent(self, event):
request = event.request
request_id = event.request_id