Merge "Delay Github fileschanges workaround to pipeline processing"

This commit is contained in:
Zuul 2018-12-29 14:49:32 +00:00 committed by Gerrit Code Review
commit 5ae6d549a5
7 changed files with 126 additions and 32 deletions

View File

@ -1001,7 +1001,6 @@ class FakeGithubPullRequest(object):
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
f = open(fn, 'w')
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])

View File

@ -119,6 +119,42 @@ class TestGithubDriver(ZuulTestCase):
self.waitUntilSettled()
self.assertEqual(1, len(self.history))
@simple_layout('layouts/files-github.yaml', driver='github')
def test_pull_changed_files_length_mismatch_reenqueue(self):
# Hold jobs so we can trigger a reconfiguration while the item is in
# the pipeline
self.executor_server.hold_jobs_in_build = True
files = {'{:03d}.txt'.format(n): 'test' for n in range(300)}
# File 301 which is not included in the list of files of the PR,
# since Github only returns max. 300 files in alphabetical order
files["foobar-requires"] = "test"
A = self.fake_github.openFakePullRequest(
'org/project', 'master', 'A', files=files)
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
# Comment on the pull request to trigger updateChange
self.fake_github.emitEvent(A.getCommentAddedEvent('casual comment'))
self.waitUntilSettled()
# Trigger reconfig to enforce a reenqueue of the item
self.sched.reconfigure(self.config)
self.waitUntilSettled()
# Now we can release all jobs
self.executor_server.hold_jobs_in_build = True
self.executor_server.release()
self.waitUntilSettled()
# There must be exactly one successful job in the history. If there is
# an aborted job in the history the reenqueue failed.
self.assertHistory([
dict(name='project-test1', result='SUCCESS',
changes="%s,%s" % (A.number, A.head_sha)),
])
@simple_layout('layouts/basic-github.yaml', driver='github')
def test_pull_github_files_error(self):
A = self.fake_github.openFakePullRequest(

View File

@ -904,24 +904,17 @@ class GithubConnection(BaseConnection):
return changes
def getFilesChanges(self, project_name, head, base):
job = self.sched.merger.getFilesChanges(self.connection_name,
project_name,
head, base)
self.log.debug("Waiting for fileschanges job %s", job)
job.wait()
if not job.updated:
raise Exception("Fileschanges job {} failed".format(job))
self.log.debug("Fileschanges job %s got changes on files %s",
job, job.files)
return job.files
def _updateChange(self, change):
self.log.info("Updating %s" % (change,))
change.pr = self.getPull(change.project.name, change.number)
change.ref = "refs/pull/%s/head" % change.number
change.branch = change.pr.get('base').get('ref')
change.files = change.pr.get('files')
# Don't overwrite the files list. The change object is bound to a
# specific revision and thus the changed files won't change. This is
# important if we got the files later because of the 300 files limit.
if not change.files:
change.files = change.pr.get('files')
# Github's pull requests files API only returns at max
# the first 300 changed files of a PR in alphabetical order.
# https://developer.github.com/v3/pulls/#list-pull-requests-files
@ -929,10 +922,11 @@ class GithubConnection(BaseConnection):
self.log.warning("Got only %s files but PR has %s files.",
len(change.files),
change.pr.get('changed_files', 0))
change.files = self.getFilesChanges(
change.project.name,
change.ref,
change.branch)
# In this case explicitly set change.files to None to signalize
# that we need to ask the mergers later in pipeline processing.
# We cannot query the files here using the mergers because this
# can slow down the github event queue considerably.
change.files = None
change.title = change.pr.get('title')
change.open = change.pr.get('state') == 'open'
change.is_merged = change.pr.get('merged')

View File

@ -578,8 +578,6 @@ class PipelineManager(object):
return self._loadDynamicLayout(item)
def scheduleMerge(self, item, files=None, dirs=None):
build_set = item.current_build_set
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
(item, files, dirs))
build_set = item.current_build_set
@ -594,23 +592,38 @@ class PipelineManager(object):
precedence=self.pipeline.precedence)
return False
def scheduleFilesChanges(self, item):
self.log.debug("Scheduling fileschanged for item %s", item)
build_set = item.current_build_set
build_set.files_state = build_set.PENDING
self.sched.merger.getFilesChanges(
item.change.project.connection_name, item.change.project.name,
item.change.ref, item.change.branch, build_set=build_set)
return False
def prepareItem(self, item):
# This runs on every iteration of _processOneItem
# Returns True if the item is ready, false otherwise
ready = True
build_set = item.current_build_set
if not build_set.ref:
build_set.setConfiguration()
if build_set.merge_state == build_set.NEW:
return self.scheduleMerge(item,
files=['zuul.yaml', '.zuul.yaml'],
dirs=['zuul.d', '.zuul.d'])
ready = self.scheduleMerge(item,
files=['zuul.yaml', '.zuul.yaml'],
dirs=['zuul.d', '.zuul.d'])
if build_set.files_state == build_set.NEW:
ready = self.scheduleFilesChanges(item)
if build_set.files_state == build_set.PENDING:
ready = False
if build_set.merge_state == build_set.PENDING:
return False
ready = False
if build_set.unable_to_merge:
return False
ready = False
if build_set.config_errors:
return False
return True
ready = False
return ready
def prepareJobs(self, item):
# This only runs once the item is in the pipeline's action window
@ -820,6 +833,12 @@ class PipelineManager(object):
self._resumeBuilds(build.build_set)
return True
def onFilesChangesCompleted(self, event):
build_set = event.build_set
item = build_set.item
item.change.files = event.files
build_set.files_state = build_set.COMPLETE
def onMergeCompleted(self, event):
build_set = event.build_set
item = build_set.item

View File

@ -132,12 +132,14 @@ class MergeClient(object):
return job
def getFilesChanges(self, connection_name, project_name, branch,
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH):
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH,
build_set=None):
data = dict(connection=connection_name,
project=project_name,
branch=branch,
tosha=tosha)
job = self.submitJob('merger:fileschanges', data, None, precedence)
job = self.submitJob('merger:fileschanges', data, build_set,
precedence)
return job
def onBuildCompleted(self, job):
@ -153,9 +155,13 @@ class MergeClient(object):
(job, merged, job.updated, commit))
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set,
merged, job.updated, commit, files,
repo_state)
if job.name == 'merger:fileschanges':
self.sched.onFilesChangesCompleted(job.build_set, files)
else:
self.sched.onMergeCompleted(job.build_set,
merged, job.updated, commit, files,
repo_state)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)

View File

@ -1784,6 +1784,10 @@ class BuildSet(object):
self.files = RepoFiles()
self.repo_state = {}
self.tries = {}
if item.change.files is not None:
self.files_state = self.COMPLETE
else:
self.files_state = self.NEW
@property
def ref(self):
@ -2584,6 +2588,11 @@ class Ref(object):
return set()
def updatesConfig(self):
if self.files is None:
# If self.files is None we don't know if this change updates the
# config so assume it does as this is a safe default if we don't
# know.
return True
if 'zuul.yaml' in self.files or '.zuul.yaml' in self.files or \
[True for fn in self.files if fn.startswith("zuul.d/") or
fn.startswith(".zuul.d/")]:

View File

@ -215,6 +215,18 @@ class MergeCompletedEvent(ResultEvent):
self.repo_state = repo_state
class FilesChangesCompletedEvent(ResultEvent):
"""A remote fileschanges operation has completed
:arg BuildSet build_set: The build_set which is ready.
:arg list files: List of files changed.
"""
def __init__(self, build_set, files):
self.build_set = build_set
self.files = files
class NodesProvisionedEvent(ResultEvent):
"""Nodes have been provisioned for a build_set
@ -475,6 +487,11 @@ class Scheduler(threading.Thread):
self.result_event_queue.put(event)
self.wake_event.set()
def onFilesChangesCompleted(self, build_set, files):
event = FilesChangesCompletedEvent(build_set, files)
self.result_event_queue.put(event)
self.wake_event.set()
def onNodesProvisioned(self, req):
event = NodesProvisionedEvent(req)
self.result_event_queue.put(event)
@ -1107,6 +1124,8 @@ class Scheduler(threading.Thread):
self._doBuildCompletedEvent(event)
elif isinstance(event, MergeCompletedEvent):
self._doMergeCompletedEvent(event)
elif isinstance(event, FilesChangesCompletedEvent):
self._doFilesChangesCompletedEvent(event)
elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event)
else:
@ -1264,6 +1283,18 @@ class Scheduler(threading.Thread):
return
pipeline.manager.onMergeCompleted(event)
def _doFilesChangesCompletedEvent(self, event):
build_set = event.build_set
if build_set is not build_set.item.current_build_set:
self.log.warning("Build set %s is not current", build_set)
return
pipeline = build_set.item.pipeline
if not pipeline:
self.log.warning("Build set %s is not associated with a pipeline",
build_set)
return
pipeline.manager.onFilesChangesCompleted(event)
def _doNodesProvisionedEvent(self, event):
request = event.request
request_id = event.request_id