Use gearman client keepalive
If the gearman server vanishes (e.g. due to a VM crash) some clients like the merger may not notice that it is gone. They just wait forever for data to be received on an inactive connection. In our case the VM containing the zuul-scheduler crashed and after the restart of the scheduler all mergers were waiting for data on the stale connection which blocked a successful scheduler restart. Using tcp keepalive we can detect that situation and let broken inactive connections be killed by the kernel. Depends-On: I8589cd45450245a25539c051355b38d16ee9f4b9 Change-Id: I30049d59d873d64f3b69c5587c775827e3545854
This commit is contained in:
parent
cea6505d1b
commit
fb4c6402a4
|
@ -8,7 +8,7 @@ python-daemon>=2.0.4,<2.1.0
|
|||
extras
|
||||
statsd>=3.0
|
||||
voluptuous>=0.10.2
|
||||
gear>=0.9.0,<1.0.0
|
||||
gear>=0.13.0,<1.0.0
|
||||
apscheduler>=3.0
|
||||
PrettyTable>=0.6,<0.8
|
||||
babel>=1.0
|
||||
|
|
|
@ -151,7 +151,9 @@ class GithubGearmanWorker(object):
|
|||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.gearman = gear.TextWorker('Zuul Github Connector')
|
||||
self.log.debug("Connect to gearman")
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.gearman.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
|
|
|
@ -121,7 +121,9 @@ class ExecutorClient(object):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.gearman = ZuulGearmanClient(self)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
|
||||
self.cleanup_thread = GearmanCleanup(self)
|
||||
self.cleanup_thread.start()
|
||||
|
|
|
@ -2090,10 +2090,14 @@ class ExecutorServer(object):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
|
||||
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.executor_worker = ExecutorExecuteWorker(
|
||||
self, 'Zuul Executor Server')
|
||||
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.merger_worker.waitForServer()
|
||||
self.executor_worker.waitForServer()
|
||||
|
|
|
@ -82,7 +82,9 @@ class MergeClient(object):
|
|||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
|
||||
self.gearman = MergeGearmanClient(self)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for gearman")
|
||||
self.gearman.waitForServer()
|
||||
self.jobs = set()
|
||||
|
|
|
@ -60,7 +60,9 @@ class MergeServer(object):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.worker = gear.TextWorker('Zuul Merger')
|
||||
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
|
|
|
@ -29,7 +29,9 @@ class RPCClient(object):
|
|||
def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None):
|
||||
self.log.debug("Connecting to gearman at %s:%s" % (server, port))
|
||||
self.gearman = gear.Client()
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for gearman")
|
||||
self.gearman.waitForServer()
|
||||
|
||||
|
|
|
@ -48,7 +48,9 @@ class RPCListener(object):
|
|||
ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
|
||||
ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
|
||||
self.worker = gear.TextWorker('Zuul RPC Listener')
|
||||
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
|
||||
self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca,
|
||||
keepalive=True, tcp_keepidle=60,
|
||||
tcp_keepintvl=30, tcp_keepcnt=5)
|
||||
self.log.debug("Waiting for server")
|
||||
self.worker.waitForServer()
|
||||
self.log.debug("Registering")
|
||||
|
|
Loading…
Reference in New Issue