Merge "Parallelize update threads"
This commit is contained in:
commit
5b7c278eb2
|
@ -17,6 +17,7 @@ import collections
|
|||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
|
@ -2252,6 +2253,10 @@ class ExecutorServer(object):
|
|||
self.merge_speed_time = get_default(
|
||||
config, 'merger', 'git_http_low_speed_time', '30')
|
||||
self.git_timeout = get_default(config, 'merger', 'git_timeout', 300)
|
||||
|
||||
# TODO(tobiash): Take cgroups into account
|
||||
self.update_workers = multiprocessing.cpu_count()
|
||||
self.update_threads = []
|
||||
# If the execution driver ever becomes configurable again,
|
||||
# this is where it would happen.
|
||||
execution_wrapper_name = 'bubblewrap'
|
||||
|
@ -2354,11 +2359,13 @@ class ExecutorServer(object):
|
|||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
self.log.debug("Starting worker")
|
||||
self.update_thread = threading.Thread(target=self._updateLoop,
|
||||
name='update')
|
||||
self.update_thread.daemon = True
|
||||
self.update_thread.start()
|
||||
self.log.debug("Starting workers")
|
||||
for i in range(self.update_workers):
|
||||
update_thread = threading.Thread(target=self._updateLoop,
|
||||
name='update')
|
||||
update_thread.daemon = True
|
||||
update_thread.start()
|
||||
self.update_threads.append(update_thread)
|
||||
self.merger_thread = threading.Thread(target=self.run_merger,
|
||||
name='merger')
|
||||
self.merger_thread.daemon = True
|
||||
|
@ -2437,7 +2444,8 @@ class ExecutorServer(object):
|
|||
# Now that we aren't accepting any new jobs, and all of the
|
||||
# running jobs have stopped, tell the update processor to
|
||||
# stop.
|
||||
self.update_queue.put(None)
|
||||
for _ in self.update_threads:
|
||||
self.update_queue.put(None)
|
||||
|
||||
# All job results should have been sent by now, shutdown the
|
||||
# gearman workers.
|
||||
|
@ -2455,7 +2463,8 @@ class ExecutorServer(object):
|
|||
|
||||
def join(self):
|
||||
self.governor_thread.join()
|
||||
self.update_thread.join()
|
||||
for update_thread in self.update_threads:
|
||||
update_thread.join()
|
||||
self.merger_thread.join()
|
||||
self.executor_thread.join()
|
||||
|
||||
|
|
Loading…
Reference in New Issue