未验证 提交 4ac9d64f 编写于 作者: K kuizhiqing 提交者: GitHub

fix launch exit graceful (#43940)

上级 99a4ff8f
...@@ -76,6 +76,10 @@ class Context(object): ...@@ -76,6 +76,10 @@ class Context(object):
def get_envs(self): def get_envs(self):
return self.envs.copy() return self.envs.copy()
def set_envs(self, env={}):
env = {k: v for k, v in env.items() if isinstance(v, str)}
self.envs.update(env)
def _enable_plugin(self): def _enable_plugin(self):
for pl in plugins.enabled_plugins: for pl in plugins.enabled_plugins:
pl(self) pl(self)
......
...@@ -49,6 +49,8 @@ class ControllerBase(object): ...@@ -49,6 +49,8 @@ class ControllerBase(object):
jid=self.ctx.args.job_id) jid=self.ctx.args.job_id)
self.pod = Pod() self.pod = Pod()
self.ctx.set_envs({"POD_NAME": self.pod.name})
self.join_server = None self.join_server = None
def deploy_pod(self): def deploy_pod(self):
...@@ -104,17 +106,18 @@ class ControllerBase(object): ...@@ -104,17 +106,18 @@ class ControllerBase(object):
self.ctx.logger.info("Pod {}".format(status)) self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.error("Container failed !!!\n{}".format(fc[0])) self.ctx.logger.error("Container failed !!!\n{}".format(fc[0]))
fc[0].tail() fc[0].tail()
self.pod.stop()
if self.ctx.args.elastic_level <= 0: if self.ctx.args.elastic_level <= 0:
self.pod.stop(timeout=3)
return True return True
else: else:
self.pod.stop(timeout=30)
return False return False
# peer failure # peer failure
if self.ctx.status.is_restarting( if self.ctx.status.is_restarting(
) and self.master.get_status() != self.ctx.status.COMPLETED: ) and self.master.get_status() != self.ctx.status.COMPLETED:
self.pod.stop() self.pod.stop(timeout=30)
return False return False
def stop(self, sigint=None): def stop(self, sigint=None):
...@@ -123,7 +126,7 @@ class ControllerBase(object): ...@@ -123,7 +126,7 @@ class ControllerBase(object):
self.watcher.stop() self.watcher.stop()
self.master.stop() self.master.stop()
self.pod.stop(sigint) self.pod.stop(timeout=30)
def finalize(self): def finalize(self):
self.pod.join() self.pod.join()
...@@ -133,17 +136,16 @@ class ControllerBase(object): ...@@ -133,17 +136,16 @@ class ControllerBase(object):
sys.exit(self.pod.exit_code) sys.exit(self.pod.exit_code)
def signal_handler(self, sigint, frame): def signal_handler(self, sigint, frame):
self.ctx.logger.info("Terminating with signal {}".format(sigint))
if hasattr(self, 'sigint'): if hasattr(self, 'sigint'):
self.ctx.logger.info("Force quit in 10 seconds...") self.ctx.logger.info("Force quit in 10 seconds...")
time.sleep(11) self.pod.stop(timeout=10)
sys.exit(sigint) sys.exit(sigint)
self.ctx.logger.info("Terminating with signal {}".format(sigint))
self.sigint = sigint self.sigint = sigint
self.ctx.status.done() self.ctx.status.done()
self.stop(sigint) self.stop(sigint=sigint)
time.sleep(1)
self.ctx.logger.info("Exit with signal {}".format(sigint)) self.ctx.logger.info("Exit with signal {}".format(sigint))
sys.exit(sigint) sys.exit(sigint)
......
...@@ -316,5 +316,5 @@ class ETCDMaster(Master): ...@@ -316,5 +316,5 @@ class ETCDMaster(Master):
def stop(self): def stop(self):
if hasattr(self, 'beat_thread'): if hasattr(self, 'beat_thread'):
self.ctx.status.done() self.ctx.status.done()
# TODO(kuizhiqing) thread should exit # daemon thread
#self.beat_thread.join() #self.beat_thread.join()
...@@ -93,4 +93,6 @@ class Watcher(object): ...@@ -93,4 +93,6 @@ class Watcher(object):
def stop(self): def stop(self):
if hasattr(self, "proc"): if hasattr(self, "proc"):
self.proc.join() # daemon without join
# self.proc.join()
pass
...@@ -131,7 +131,11 @@ class Container(object): ...@@ -131,7 +131,11 @@ class Container(object):
return self._proc.terminate(force) return self._proc.terminate(force)
def wait(self, timeout=None): def wait(self, timeout=None):
try:
self._proc.wait(timeout) self._proc.wait(timeout)
return True
except Exception:
return False
@property @property
def exit_code(self): def exit_code(self):
......
...@@ -116,14 +116,26 @@ class Pod(PodSepc): ...@@ -116,14 +116,26 @@ class Pod(PodSepc):
self._restart += 1 self._restart += 1
def stop(self, sigint=0): def stop(self, sigint=15, timeout=None):
for c in self._containers: for c in self._containers:
force = True if sigint == 9 else False if isinstance(sigint, int) and timeout is None:
c.terminate(force) c.send_signal(sigint)
else:
c.terminate()
if isinstance(timeout, int):
if not self.join(timeout):
for c in self._containers:
c.terminate(force=True)
return False
else:
return True
def join(self): def join(self, timeout=None):
for c in self._containers: for c in self._containers:
c.wait(None) if not c.wait(timeout):
return False
return True
@property @property
def status(self): def status(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册