Merge "Perform per repo locking on the executor"

This commit is contained in:
Zuul 2019-03-28 21:43:41 +00:00 committed by Gerrit Code Review
commit 79127f5f02
2 changed files with 81 additions and 42 deletions

View File

@ -78,6 +78,17 @@ class RoleNotFoundError(ExecutorError):
pass
class RepoLocks:
def __init__(self):
self.locks = {}
def getRepoLock(self, connection_name, project_name):
key = '%s:%s' % (connection_name, project_name)
self.locks.setdefault(key, threading.Lock())
return self.locks[key]
class DiskAccountant(object):
''' A single thread to periodically run du and monitor a base directory
@ -2175,8 +2186,7 @@ class ExecutorMergeWorker(gear.TextWorker):
while self.zuul_executor_server.update_queue.qsize():
time.sleep(1)
with self.zuul_executor_server.merger_lock:
super(ExecutorMergeWorker, self).handleNoop(packet)
super(ExecutorMergeWorker, self).handleNoop(packet)
class ExecutorExecuteWorker(gear.TextWorker):
@ -2209,7 +2219,7 @@ class ExecutorServer(object):
self.hostname = get_default(self.config, 'executor', 'hostname',
socket.getfqdn())
self.log_streaming_port = log_streaming_port
self.merger_lock = threading.Lock()
self.repo_locks = RepoLocks()
self.governor_lock = threading.Lock()
self.run_lock = threading.Lock()
self.verbose = False
@ -2496,7 +2506,9 @@ class ExecutorServer(object):
# We are asked to stop
raise StopException()
try:
with self.merger_lock:
lock = self.repo_locks.getRepoLock(
task.connection_name, task.project_name)
with lock:
self.log.info("Updating repo %s/%s",
task.connection_name, task.project_name)
self.merger.updateRepo(task.connection_name, task.project_name)
@ -2697,7 +2709,9 @@ class ExecutorServer(object):
args = json.loads(job.arguments)
task = self.update(args['connection'], args['project'])
task.wait()
with self.merger_lock:
lock = self.repo_locks.getRepoLock(
task.connection_name, task.project_name)
with lock:
files = self.merger.getFiles(args['connection'], args['project'],
args['branch'], args['files'],
args.get('dirs', []))
@ -2709,7 +2723,9 @@ class ExecutorServer(object):
args = json.loads(job.arguments)
task = self.update(args['connection'], args['project'])
task.wait()
with self.merger_lock:
lock = self.repo_locks.getRepoLock(
args['connection'], args['project'])
with lock:
files = self.merger.getFilesChanges(
args['connection'], args['project'],
args['branch'],
@ -2720,18 +2736,18 @@ class ExecutorServer(object):
def refstate(self, job):
args = json.loads(job.arguments)
with self.merger_lock:
success, repo_state = self.merger.getRepoState(args['items'])
success, repo_state = self.merger.getRepoState(
args['items'], repo_locks=self.repo_locks)
result = dict(updated=success,
repo_state=repo_state)
job.sendWorkComplete(json.dumps(result))
def merge(self, job):
args = json.loads(job.arguments)
with self.merger_lock:
ret = self.merger.mergeChanges(args['items'], args.get('files'),
args.get('dirs', []),
args.get('repo_state'))
ret = self.merger.mergeChanges(args['items'], args.get('files'),
args.get('dirs', []),
args.get('repo_state'),
repo_locks=self.repo_locks)
result = dict(merged=(ret is not None))
if ret is None:
result['commit'] = result['files'] = result['repo_state'] = None

View File

@ -67,6 +67,11 @@ def timeout_handler(path):
raise
@contextmanager
def nullcontext():
yield
class Repo(object):
commit_re = re.compile(r'^commit ([0-9a-f]{40})$')
diff_re = re.compile(r'^@@ -\d+,\d \+(\d+),\d @@$')
@ -679,7 +684,8 @@ class Merger(object):
recent[key] = commit
return orig_commit, commit
def mergeChanges(self, items, files=None, dirs=None, repo_state=None):
def mergeChanges(self, items, files=None, dirs=None, repo_state=None,
repo_locks=None):
# connection+project+branch -> commit
recent = {}
commit = None
@ -688,19 +694,27 @@ class Merger(object):
if repo_state is None:
repo_state = {}
for item in items:
self.log.debug("Merging for change %s,%s" %
(item["number"], item["patchset"]))
orig_commit, commit = self._mergeItem(item, recent, repo_state)
if not commit:
return None
if files or dirs:
repo = self.getRepo(item['connection'], item['project'])
repo_files = repo.getFiles(files, dirs, commit=commit)
read_files.append(dict(
connection=item['connection'],
project=item['project'],
branch=item['branch'],
files=repo_files))
# If we're in the executor context we have the repo_locks object
# and perform per repo locking.
if repo_locks is not None:
lock = repo_locks.getRepoLock(
item['connection'], item['project'])
else:
lock = nullcontext()
with lock:
self.log.debug("Merging for change %s,%s" %
(item["number"], item["patchset"]))
orig_commit, commit = self._mergeItem(item, recent, repo_state)
if not commit:
return None
if files or dirs:
repo = self.getRepo(item['connection'], item['project'])
repo_files = repo.getFiles(files, dirs, commit=commit)
read_files.append(dict(
connection=item['connection'],
project=item['project'],
branch=item['branch'],
files=repo_files))
ret_recent = {}
for k, v in recent.items():
ret_recent[k] = v.hexsha
@ -720,7 +734,7 @@ class Merger(object):
self._restoreRepoState(item['connection'], item['project'], repo,
repo_state)
def getRepoState(self, items):
def getRepoState(self, items, repo_locks=None):
# Gets the repo state for items. Generally this will be
# called in any non-change pipeline. We will return the repo
# state for each item, but manipulated with any information in
@ -730,23 +744,32 @@ class Merger(object):
recent = {}
repo_state = {}
for item in items:
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
if key not in seen:
try:
repo.reset()
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return (False, {})
# If we're in the executor context we have the repo_locks object
# and perform per repo locking.
if repo_locks is not None:
lock = repo_locks.getRepoLock(
item['connection'], item['project'])
else:
lock = nullcontext()
with lock:
repo = self.getRepo(item['connection'], item['project'])
key = (item['connection'], item['project'], item['branch'])
if key not in seen:
try:
repo.reset()
except Exception:
self.log.exception("Unable to reset repo %s" % repo)
return (False, {})
self._saveRepoState(item['connection'], item['project'], repo,
repo_state, recent)
self._saveRepoState(item['connection'], item['project'],
repo, repo_state, recent)
if item.get('newrev'):
# This is a ref update rather than a branch tip, so make sure
# our returned state includes this change.
self._alterRepoState(item['connection'], item['project'],
repo_state, item['ref'], item['newrev'])
if item.get('newrev'):
# This is a ref update rather than a branch tip, so make
# sure our returned state includes this change.
self._alterRepoState(
item['connection'], item['project'], repo_state,
item['ref'], item['newrev'])
return (True, repo_state)
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):