Cache branches in connections/sources

The current attempt to caches branches is ineffective -- we
query the list of branches during every tenant reconfiguration.

The list of branches for a project is really global information;
we might cache it on the Abide, however, drivers may need to filter
that list based on tenant configuration (eg, github protected
branches).  To accomodate that, just allow/expect the drivers to
perform their own caching of branches, and to generally keep
the list up to date (or at least invalidate their caches) by
observing branch create/delete events.

A full reconfiguration instructs the connections to clear their
caches so that we perform a full query.  That way, an operator
can correct from a situation where the cache is invalid.

Change-Id: I3bd0cda5875dd21368e384e3704a61ebb5dcedfa
This commit is contained in:
James E. Blair 2018-08-08 10:26:27 -07:00
parent 6479892b9c
commit a48c9101c6
10 changed files with 188 additions and 27 deletions

View File

@ -311,6 +311,10 @@ class TestExecutorRepos(ZuulTestCase):
p1 = 'review.example.com/org/project1'
projects = [p1]
self.create_branch('org/project1', 'stable/havana')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project1', 'stable/havana'))
self.waitUntilSettled()
# The pipeline triggers every second, so we should have seen
# several by now.

View File

@ -806,6 +806,12 @@ class TestGithubDriver(ZuulTestCase):
repo = github.repo_from_project(project)
repo._create_branch(branch)
self.fake_github.emitEvent(
self.fake_github.getPushEvent(
project,
ref='refs/heads/%s' % branch))
self.waitUntilSettled()
A = self.fake_github.openFakePullRequest(project, branch, 'A')
old_sha = A.head_sha
A.setMerged("merging A")

View File

@ -155,6 +155,11 @@ class TestScheduler(ZuulTestCase):
def test_job_branch(self):
"Test the correct variant of a job runs on a branch"
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
@ -181,6 +186,10 @@ class TestScheduler(ZuulTestCase):
self.executor_server.merger_worker.unRegisterFunction(f)
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'stable', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -2707,7 +2716,15 @@ class TestScheduler(ZuulTestCase):
@simple_layout('layouts/job-variants.yaml')
def test_job_branch_variants(self):
self.create_branch('org/project', 'stable/diablo')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable/diablo'))
self.create_branch('org/project', 'stable/essex')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable/essex'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -3876,6 +3893,10 @@ class TestScheduler(ZuulTestCase):
self.addCleanup(client.shutdown)
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True
self.commitConfigUpdate('common-config', 'layouts/timer.yaml')
self.sched.reconfigure(self.config)
@ -5495,6 +5516,10 @@ class TestSchedulerTemplatedProject(ZuulTestCase):
# This tests that there are no implied branch matchers added
# to project templates in unbranched projects.
self.create_branch('org/layered-project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/layered-project', 'stable'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange(
'org/layered-project', 'stable', 'A')
@ -5634,6 +5659,10 @@ class TestSchedulerMerges(ZuulTestCase):
def test_merge_branch(self):
"Test that the right commits are on alternate branches"
self.create_branch('org/project-merge-branches', 'mp')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project-merge-branches', 'mp'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange(
@ -5679,6 +5708,10 @@ class TestSchedulerMerges(ZuulTestCase):
def test_merge_multi_branch(self):
"Test that dependent changes on multiple branches are merged"
self.create_branch('org/project-merge-branches', 'mp')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project-merge-branches', 'mp'))
self.waitUntilSettled()
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange(
@ -6370,6 +6403,10 @@ class TestSchedulerBranchMatcher(ZuulTestCase):
not be run on a change to that branch.
'''
self.create_branch('org/project', 'featureA')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'featureA'))
self.waitUntilSettled()
A = self.fake_gerrit.addFakeChange('org/project', 'featureA', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))

View File

@ -2791,6 +2791,10 @@ class TestRoleBranches(RoleTestCase):
# dependency chain.
# First we create some branch-specific content in project1:
self.create_branch('project1', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'project1', 'stable'))
self.waitUntilSettled()
# A pre-playbook with unique stable branch content.
p = self._addPlaybook('project1', 'stable',
@ -3143,6 +3147,10 @@ class TestPragma(ZuulTestCase):
def test_no_pragma(self):
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
with open(os.path.join(FIXTURE_DIR,
'config/pragma/git/',
'org_project/nopragma.yaml')) as f:
@ -3166,6 +3174,10 @@ class TestPragma(ZuulTestCase):
def test_pragma(self):
self.create_branch('org/project', 'stable')
self.fake_gerrit.addEvent(
self.fake_gerrit.getFakeBranchCreatedEvent(
'org/project', 'stable'))
self.waitUntilSettled()
with open(os.path.join(FIXTURE_DIR,
'config/pragma/git/',
'org_project/pragma.yaml')) as f:

View File

@ -1267,7 +1267,7 @@ class TenantParser(object):
}
return vs.Schema(tenant)
def fromYaml(self, abide, project_key_dir, conf, old_tenant):
def fromYaml(self, abide, project_key_dir, conf):
self.getSchema()(conf)
tenant = model.Tenant(conf['name'])
if conf.get('max-nodes-per-job') is not None:
@ -1289,7 +1289,7 @@ class TenantParser(object):
tenant.addUntrustedProject(tpc)
for tpc in config_tpcs + untrusted_tpcs:
self._getProjectBranches(tenant, tpc, old_tenant)
self._getProjectBranches(tenant, tpc)
self._resolveShadowProjects(tenant, tpc)
# We prepare a stack to store config loading issues
@ -1335,16 +1335,9 @@ class TenantParser(object):
shadow_projects.append(project)
tpc.shadow_projects = frozenset(shadow_projects)
def _getProjectBranches(self, tenant, tpc, old_tenant):
# If we're performing a tenant reconfiguration, we will have
# an old_tenant object, however, we may be doing so because of
# a branch creation event, so if we don't have any cached
# data, query the branches again as well.
if old_tenant and tpc.parsed_branch_config:
branches = old_tenant.getProjectBranches(tpc.project)[:]
else:
branches = sorted(tpc.project.source.getProjectBranches(
tpc.project, tenant))
def _getProjectBranches(self, tenant, tpc):
branches = sorted(tpc.project.source.getProjectBranches(
tpc.project, tenant))
if 'master' in branches:
branches.remove('master')
branches = ['master'] + branches
@ -1901,7 +1894,7 @@ class ConfigLoader(object):
for conf_tenant in unparsed_abide.tenants:
# When performing a full reload, do not use cached data.
tenant = self.tenant_parser.fromYaml(abide, project_key_dir,
conf_tenant, old_tenant=None)
conf_tenant)
abide.tenants[tenant.name] = tenant
if len(tenant.layout.loading_errors):
self.log.warning(
@ -1923,7 +1916,7 @@ class ConfigLoader(object):
new_tenant = self.tenant_parser.fromYaml(
new_abide,
project_key_dir,
tenant.unparsed_config, old_tenant=tenant)
tenant.unparsed_config)
new_abide.tenants[tenant.name] = new_tenant
if len(new_tenant.layout.loading_errors):
self.log.warning(

View File

@ -68,12 +68,25 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
def registerScheduler(self, sched):
self.sched = sched
def clearBranchCache(self):
"""Clear the branch cache for this connection.
This is called immediately prior to performing a full
reconfiguration. The branch cache should be cleared so that a
full reconfiguration can be used to correct any errors in
cached data.
"""
pass
def maintainCache(self, relevant):
"""Make cache contain relevant changes.
This lets the user supply a list of change objects that are
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""
pass
def getWebController(self, zuul_web):
"""Return a cherrypy web controller to register with zuul-web.

View File

@ -142,9 +142,13 @@ class GerritEventConnector(threading.Thread):
if event.oldrev == '0' * 40:
event.branch_created = True
event.branch = event.ref
project = self.connection.source.getProject(event.project_name)
self.connection._clearBranchCache(project)
if event.newrev == '0' * 40:
event.branch_deleted = True
event.branch = event.ref
project = self.connection.source.getProject(event.project_name)
self.connection._clearBranchCache(project)
self._getChange(event)
self.connection.logEvent(event)
@ -292,6 +296,7 @@ class GerritConnection(BaseConnection):
def __init__(self, driver, connection_name, connection_config):
super(GerritConnection, self).__init__(driver, connection_name,
connection_config)
self._project_branch_cache = {}
if 'server' not in self.connection_config:
raise Exception('server is required for gerrit connections in '
'%s' % self.connection_name)
@ -377,6 +382,15 @@ class GerritConnection(BaseConnection):
def addProject(self, project: Project) -> None:
self.projects[project.name] = project
def clearBranchCache(self):
self._project_branch_cache = {}
def _clearBranchCache(self, project):
try:
del self._project_branch_cache[project.name]
except KeyError:
pass
def maintainCache(self, relevant):
# This lets the user supply a list of change objects that are
# still in use. Anything in our cache that isn't in the supplied
@ -763,9 +777,14 @@ class GerritConnection(BaseConnection):
return changes
def getProjectBranches(self, project: Project, tenant) -> List[str]:
branches = self._project_branch_cache.get(project.name)
if branches is not None:
return branches
refs = self.getInfoRefs(project)
heads = [str(k[len('refs/heads/'):]) for k in refs.keys()
if k.startswith('refs/heads/')]
self._project_branch_cache[project.name] = heads
return heads
def addEvent(self, data):

View File

@ -259,6 +259,7 @@ class GithubEventConnector(threading.Thread):
event.branch_deleted = True
if event.branch:
project = self.connection.source.getProject(event.project_name)
if event.branch_deleted:
# We currently cannot determine if a deleted branch was
# protected so we need to assume it was. GitHub doesn't allow
@ -268,15 +269,19 @@ class GithubEventConnector(threading.Thread):
# of the branch.
# FIXME(tobiash): Find a way to handle that case
event.branch_protected = True
self.connection._clearBranchCache(project)
elif event.branch_created:
# A new branch never can be protected because that needs to be
# configured after it has been created.
event.branch_protected = False
self.connection._clearBranchCache(project)
else:
# An updated branch can be protected or not so we have to ask
# GitHub whether it is.
b = self.connection.getBranch(event.project_name, event.branch)
event.branch_protected = b.get('protected')
self.connection.checkBranchCache(project, event.branch,
event.branch_protected)
return event
@ -457,7 +462,8 @@ class GithubConnection(BaseConnection):
super(GithubConnection, self).__init__(
driver, connection_name, connection_config)
self._change_cache = {}
self._project_branch_cache = {}
self._project_branch_cache_include_unprotected = {}
self._project_branch_cache_exclude_unprotected = {}
self.projects = {}
self.git_ssh_key = self.connection_config.get('sshkey')
self.server = self.connection_config.get('server', 'github.com')
@ -929,10 +935,22 @@ class GithubConnection(BaseConnection):
def addProject(self, project):
self.projects[project.name] = project
def getProjectBranches(self, project, tenant):
github = self.getGithubClient(project.name)
exclude_unprotected = tenant.getExcludeUnprotectedBranches(project)
def clearBranchCache(self):
self._project_branch_cache_exclude_unprotected = {}
self._project_branch_cache_include_unprotected = {}
def getProjectBranches(self, project, tenant):
exclude_unprotected = tenant.getExcludeUnprotectedBranches(project)
if exclude_unprotected:
cache = self._project_branch_cache_exclude_unprotected
else:
cache = self._project_branch_cache_include_unprotected
branches = cache.get(project.name)
if branches is not None:
return branches
github = self.getGithubClient(project.name)
url = github.session.build_url('repos', project.name,
'branches')
@ -946,23 +964,21 @@ class GithubConnection(BaseConnection):
url, headers=headers, params=params)
# check if we need to do further paged calls
url = resp.links.get(
'next', {}).get('url')
url = resp.links.get('next', {}).get('url')
if resp.status_code == 403:
self.log.error(str(resp))
rate_limit = github.rate_limit()
if rate_limit['resources']['core']['remaining'] == 0:
self.log.warning(
"Rate limit exceeded, using stale branch list")
# failed to list branches so use a stale branch list
return self._project_branch_cache.get(project.name, [])
"Rate limit exceeded, using empty branch list")
return []
branches.extend([x['name'] for x in resp.json()])
self.log_rate_limit(self.log, github)
self._project_branch_cache[project.name] = branches
return self._project_branch_cache[project.name]
cache[project.name] = branches
return branches
def getBranch(self, project_name, branch):
github = self.getGithubClient(project_name)
@ -1329,6 +1345,46 @@ class GithubConnection(BaseConnection):
log.debug('GitHub API rate limit remaining: %s reset: %s',
remaining, reset)
def _clearBranchCache(self, project):
self.log.debug("Clearing branch cache for %s", project.name)
for cache in [
self._project_branch_cache_exclude_unprotected,
self._project_branch_cache_include_unprotected,
]:
try:
del cache[project.name]
except KeyError:
pass
def checkBranchCache(self, project, branch, protected):
# If the branch appears in the exclude_unprotected cache but
# is unprotected, clear the exclude cache.
# If the branch does not appear in the exclude_unprotected
# cache but is protected, clear the exclude cache.
# All branches should always appear in the include_unprotected
# cache, so we never clear it.
cache = self._project_branch_cache_exclude_unprotected
branches = cache.get(project.name, [])
if (branch in branches) and (not protected):
self.log.debug("Clearing protected branch cache for %s",
project.name)
try:
del cache[project.name]
except KeyError:
pass
return
if (branch not in branches) and (protected):
self.log.debug("Clearing protected branch cache for %s",
project.name)
try:
del cache[project.name]
except KeyError:
pass
return
class GithubWebController(BaseWebController):

View File

@ -639,6 +639,10 @@ class Scheduler(threading.Thread):
self.config = event.config
try:
self.log.info("Full reconfiguration beginning")
for connection in self.connections.connections.values():
self.log.debug("Clear branch cache for: %s" % connection)
connection.clearBranchCache()
loader = configloader.ConfigLoader(
self.connections, self, self.merger)
tenant_config, script = self._checkTenantSourceConf(self.config)
@ -665,6 +669,8 @@ class Scheduler(threading.Thread):
# If a change landed to a project, clear out the cached
# config of the changed branch before reconfiguring.
for (project, branch) in event.project_branches:
self.log.debug("Clearing unparsed config: %s @%s",
project.canonical_name, branch)
self.abide.clearUnparsedConfigCache(project.canonical_name,
branch)
old_tenant = self.abide.tenants[event.tenant_name]

View File

@ -49,7 +49,14 @@ class BaseSource(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def getChange(self, event):
"""Get the change representing an event."""
"""Get the change representing an event.
This method is called very frequently, and should generally
return quickly. The connection is expected to cache change
objects and automatically update them as related events are
received.
"""
@abc.abstractmethod
def getChangeByURL(self, url):
@ -91,7 +98,15 @@ class BaseSource(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def getProjectBranches(self, project, tenant):
"""Get branches for a project"""
"""Get branches for a project
This method is called very frequently, and should generally
return quickly. The connection is expected to cache branch
lists for all projects queried, and further, to automatically
clear or update that cache when it observes branch creation or
deletion events.
"""
@abc.abstractmethod
def getRequireFilters(self, config):