Merge "Perform per repo locking on the executor"
This commit is contained in:
commit
79127f5f02
|
@ -78,6 +78,17 @@ class RoleNotFoundError(ExecutorError):
|
|||
pass
|
||||
|
||||
|
||||
class RepoLocks:
|
||||
|
||||
def __init__(self):
|
||||
self.locks = {}
|
||||
|
||||
def getRepoLock(self, connection_name, project_name):
|
||||
key = '%s:%s' % (connection_name, project_name)
|
||||
self.locks.setdefault(key, threading.Lock())
|
||||
return self.locks[key]
|
||||
|
||||
|
||||
class DiskAccountant(object):
|
||||
''' A single thread to periodically run du and monitor a base directory
|
||||
|
||||
|
@ -2175,8 +2186,7 @@ class ExecutorMergeWorker(gear.TextWorker):
|
|||
while self.zuul_executor_server.update_queue.qsize():
|
||||
time.sleep(1)
|
||||
|
||||
with self.zuul_executor_server.merger_lock:
|
||||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||
super(ExecutorMergeWorker, self).handleNoop(packet)
|
||||
|
||||
|
||||
class ExecutorExecuteWorker(gear.TextWorker):
|
||||
|
@ -2209,7 +2219,7 @@ class ExecutorServer(object):
|
|||
self.hostname = get_default(self.config, 'executor', 'hostname',
|
||||
socket.getfqdn())
|
||||
self.log_streaming_port = log_streaming_port
|
||||
self.merger_lock = threading.Lock()
|
||||
self.repo_locks = RepoLocks()
|
||||
self.governor_lock = threading.Lock()
|
||||
self.run_lock = threading.Lock()
|
||||
self.verbose = False
|
||||
|
@ -2496,7 +2506,9 @@ class ExecutorServer(object):
|
|||
# We are asked to stop
|
||||
raise StopException()
|
||||
try:
|
||||
with self.merger_lock:
|
||||
lock = self.repo_locks.getRepoLock(
|
||||
task.connection_name, task.project_name)
|
||||
with lock:
|
||||
self.log.info("Updating repo %s/%s",
|
||||
task.connection_name, task.project_name)
|
||||
self.merger.updateRepo(task.connection_name, task.project_name)
|
||||
|
@ -2697,7 +2709,9 @@ class ExecutorServer(object):
|
|||
args = json.loads(job.arguments)
|
||||
task = self.update(args['connection'], args['project'])
|
||||
task.wait()
|
||||
with self.merger_lock:
|
||||
lock = self.repo_locks.getRepoLock(
|
||||
task.connection_name, task.project_name)
|
||||
with lock:
|
||||
files = self.merger.getFiles(args['connection'], args['project'],
|
||||
args['branch'], args['files'],
|
||||
args.get('dirs', []))
|
||||
|
@ -2709,7 +2723,9 @@ class ExecutorServer(object):
|
|||
args = json.loads(job.arguments)
|
||||
task = self.update(args['connection'], args['project'])
|
||||
task.wait()
|
||||
with self.merger_lock:
|
||||
lock = self.repo_locks.getRepoLock(
|
||||
args['connection'], args['project'])
|
||||
with lock:
|
||||
files = self.merger.getFilesChanges(
|
||||
args['connection'], args['project'],
|
||||
args['branch'],
|
||||
|
@ -2720,18 +2736,18 @@ class ExecutorServer(object):
|
|||
|
||||
def refstate(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
success, repo_state = self.merger.getRepoState(args['items'])
|
||||
success, repo_state = self.merger.getRepoState(
|
||||
args['items'], repo_locks=self.repo_locks)
|
||||
result = dict(updated=success,
|
||||
repo_state=repo_state)
|
||||
job.sendWorkComplete(json.dumps(result))
|
||||
|
||||
def merge(self, job):
|
||||
args = json.loads(job.arguments)
|
||||
with self.merger_lock:
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
args.get('dirs', []),
|
||||
args.get('repo_state'))
|
||||
ret = self.merger.mergeChanges(args['items'], args.get('files'),
|
||||
args.get('dirs', []),
|
||||
args.get('repo_state'),
|
||||
repo_locks=self.repo_locks)
|
||||
result = dict(merged=(ret is not None))
|
||||
if ret is None:
|
||||
result['commit'] = result['files'] = result['repo_state'] = None
|
||||
|
|
|
@ -67,6 +67,11 @@ def timeout_handler(path):
|
|||
raise
|
||||
|
||||
|
||||
@contextmanager
|
||||
def nullcontext():
|
||||
yield
|
||||
|
||||
|
||||
class Repo(object):
|
||||
commit_re = re.compile(r'^commit ([0-9a-f]{40})$')
|
||||
diff_re = re.compile(r'^@@ -\d+,\d \+(\d+),\d @@$')
|
||||
|
@ -679,7 +684,8 @@ class Merger(object):
|
|||
recent[key] = commit
|
||||
return orig_commit, commit
|
||||
|
||||
def mergeChanges(self, items, files=None, dirs=None, repo_state=None):
|
||||
def mergeChanges(self, items, files=None, dirs=None, repo_state=None,
|
||||
repo_locks=None):
|
||||
# connection+project+branch -> commit
|
||||
recent = {}
|
||||
commit = None
|
||||
|
@ -688,19 +694,27 @@ class Merger(object):
|
|||
if repo_state is None:
|
||||
repo_state = {}
|
||||
for item in items:
|
||||
self.log.debug("Merging for change %s,%s" %
|
||||
(item["number"], item["patchset"]))
|
||||
orig_commit, commit = self._mergeItem(item, recent, repo_state)
|
||||
if not commit:
|
||||
return None
|
||||
if files or dirs:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
repo_files = repo.getFiles(files, dirs, commit=commit)
|
||||
read_files.append(dict(
|
||||
connection=item['connection'],
|
||||
project=item['project'],
|
||||
branch=item['branch'],
|
||||
files=repo_files))
|
||||
# If we're in the executor context we have the repo_locks object
|
||||
# and perform per repo locking.
|
||||
if repo_locks is not None:
|
||||
lock = repo_locks.getRepoLock(
|
||||
item['connection'], item['project'])
|
||||
else:
|
||||
lock = nullcontext()
|
||||
with lock:
|
||||
self.log.debug("Merging for change %s,%s" %
|
||||
(item["number"], item["patchset"]))
|
||||
orig_commit, commit = self._mergeItem(item, recent, repo_state)
|
||||
if not commit:
|
||||
return None
|
||||
if files or dirs:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
repo_files = repo.getFiles(files, dirs, commit=commit)
|
||||
read_files.append(dict(
|
||||
connection=item['connection'],
|
||||
project=item['project'],
|
||||
branch=item['branch'],
|
||||
files=repo_files))
|
||||
ret_recent = {}
|
||||
for k, v in recent.items():
|
||||
ret_recent[k] = v.hexsha
|
||||
|
@ -720,7 +734,7 @@ class Merger(object):
|
|||
self._restoreRepoState(item['connection'], item['project'], repo,
|
||||
repo_state)
|
||||
|
||||
def getRepoState(self, items):
|
||||
def getRepoState(self, items, repo_locks=None):
|
||||
# Gets the repo state for items. Generally this will be
|
||||
# called in any non-change pipeline. We will return the repo
|
||||
# state for each item, but manipulated with any information in
|
||||
|
@ -730,23 +744,32 @@ class Merger(object):
|
|||
recent = {}
|
||||
repo_state = {}
|
||||
for item in items:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
key = (item['connection'], item['project'], item['branch'])
|
||||
if key not in seen:
|
||||
try:
|
||||
repo.reset()
|
||||
except Exception:
|
||||
self.log.exception("Unable to reset repo %s" % repo)
|
||||
return (False, {})
|
||||
# If we're in the executor context we have the repo_locks object
|
||||
# and perform per repo locking.
|
||||
if repo_locks is not None:
|
||||
lock = repo_locks.getRepoLock(
|
||||
item['connection'], item['project'])
|
||||
else:
|
||||
lock = nullcontext()
|
||||
with lock:
|
||||
repo = self.getRepo(item['connection'], item['project'])
|
||||
key = (item['connection'], item['project'], item['branch'])
|
||||
if key not in seen:
|
||||
try:
|
||||
repo.reset()
|
||||
except Exception:
|
||||
self.log.exception("Unable to reset repo %s" % repo)
|
||||
return (False, {})
|
||||
|
||||
self._saveRepoState(item['connection'], item['project'], repo,
|
||||
repo_state, recent)
|
||||
self._saveRepoState(item['connection'], item['project'],
|
||||
repo, repo_state, recent)
|
||||
|
||||
if item.get('newrev'):
|
||||
# This is a ref update rather than a branch tip, so make sure
|
||||
# our returned state includes this change.
|
||||
self._alterRepoState(item['connection'], item['project'],
|
||||
repo_state, item['ref'], item['newrev'])
|
||||
if item.get('newrev'):
|
||||
# This is a ref update rather than a branch tip, so make
|
||||
# sure our returned state includes this change.
|
||||
self._alterRepoState(
|
||||
item['connection'], item['project'], repo_state,
|
||||
item['ref'], item['newrev'])
|
||||
return (True, repo_state)
|
||||
|
||||
def getFiles(self, connection_name, project_name, branch, files, dirs=[]):
|
||||
|
|
Loading…
Reference in New Issue