Use gearman client keepalive

If the gearman server vanishes (e.g. due to a VM crash) some clients
like the merger may not notice that it is gone. They just wait forever
for data to be received on an inactive connection. In our case the VM
containing the zuul-scheduler crashed and after the restart of the
scheduler all mergers were waiting for data on the stale connection
which blocked a successful scheduler restart.  Using tcp keepalive we
can detect that situation and let broken inactive connections be
killed by the kernel.

Depends-On: I8589cd45450245a25539c051355b38d16ee9f4b9
Change-Id: I30049d59d873d64f3b69c5587c775827e3545854
This commit is contained in:
Tobias Henkel 2018-09-04 13:52:33 +02:00
parent cea6505d1b
commit fb4c6402a4
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
8 changed files with 25 additions and 9 deletions

View File

@ -8,7 +8,7 @@ python-daemon>=2.0.4,<2.1.0
extras
statsd>=3.0
voluptuous>=0.10.2
gear>=0.9.0,<1.0.0
gear>=0.13.0,<1.0.0
apscheduler>=3.0
PrettyTable>=0.6,<0.8
babel>=1.0

View File

@ -151,7 +151,9 @@ class GithubGearmanWorker(object):
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.gearman = gear.TextWorker('Zuul Github Connector')
self.log.debug("Connect to gearman")
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for server")
self.gearman.waitForServer()
self.log.debug("Registering")

View File

@ -121,7 +121,9 @@ class ExecutorClient(object):
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.gearman = ZuulGearmanClient(self)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.cleanup_thread = GearmanCleanup(self)
self.cleanup_thread.start()

View File

@ -2090,10 +2090,14 @@ class ExecutorServer(object):
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.executor_worker = ExecutorExecuteWorker(
self, 'Zuul Executor Server')
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for server")
self.merger_worker.waitForServer()
self.executor_worker.waitForServer()

View File

@ -82,7 +82,9 @@ class MergeClient(object):
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = MergeGearmanClient(self)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()
self.jobs = set()

View File

@ -60,7 +60,9 @@ class MergeServer(object):
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.worker = gear.TextWorker('Zuul Merger')
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")

View File

@ -29,7 +29,9 @@ class RPCClient(object):
def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
self.gearman = gear.Client()
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for gearman")
self.gearman.waitForServer()

View File

@ -48,7 +48,9 @@ class RPCListener(object):
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
self.worker = gear.TextWorker('Zuul RPC Listener')
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
keepalive=True, tcp_keepidle=60,
tcp_keepintvl=30, tcp_keepcnt=5)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")