Merge "Parallelize update threads"

This commit is contained in:
Zuul 2019-03-28 21:45:16 +00:00 committed by Gerrit Code Review
commit 5b7c278eb2
1 changed files with 16 additions and 7 deletions

View File

@ -17,6 +17,7 @@ import collections
import datetime
import json
import logging
import multiprocessing
import os
import shutil
import signal
@ -2252,6 +2253,10 @@ class ExecutorServer(object):
self.merge_speed_time = get_default(
config, 'merger', 'git_http_low_speed_time', '30')
self.git_timeout = get_default(config, 'merger', 'git_timeout', 300)
# TODO(tobiash): Take cgroups into account
self.update_workers = multiprocessing.cpu_count()
self.update_threads = []
# If the execution driver ever becomes configurable again,
# this is where it would happen.
execution_wrapper_name = 'bubblewrap'
@ -2354,11 +2359,13 @@ class ExecutorServer(object):
self.command_thread.daemon = True
self.command_thread.start()
self.log.debug("Starting worker")
self.update_thread = threading.Thread(target=self._updateLoop,
name='update')
self.update_thread.daemon = True
self.update_thread.start()
self.log.debug("Starting workers")
for i in range(self.update_workers):
update_thread = threading.Thread(target=self._updateLoop,
name='update')
update_thread.daemon = True
update_thread.start()
self.update_threads.append(update_thread)
self.merger_thread = threading.Thread(target=self.run_merger,
name='merger')
self.merger_thread.daemon = True
@ -2437,7 +2444,8 @@ class ExecutorServer(object):
# Now that we aren't accepting any new jobs, and all of the
# running jobs have stopped, tell the update processor to
# stop.
self.update_queue.put(None)
for _ in self.update_threads:
self.update_queue.put(None)
# All job results should have been sent by now, shutdown the
# gearman workers.
@ -2455,7 +2463,8 @@ class ExecutorServer(object):
def join(self):
self.governor_thread.join()
self.update_thread.join()
for update_thread in self.update_threads:
update_thread.join()
self.merger_thread.join()
self.executor_thread.join()