未验证 提交 a4028b4b 编写于 作者: K kuizhiqing 提交者: GitHub

Elastic fix (#34134)

* kill process by group
上级 3fd34a0e
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import six
import os
class Command(object):
def __init__(self, server, name):
import etcd3
srv, port = server.split(':')
self.etcd = etcd3.client(host=srv, port=port)
self.prefix = "/paddle/" + name
self.node_prefix = self.prefix + '/nodes'
self.np_path = self.prefix + '/np'
def set_np(self, np):
self.etcd.put(self.np_path, six.b('{}'.format(np)))
def scale_np(self, np):
if self.etcd.get(self.np_path)[0] != None:
self.set_np(np)
return True
return False
def close(self):
self.etcd.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Elastic Command')
parser.add_argument(
"--elastic_server", type=str, help="etcd server host:port")
parser.add_argument("--job_id", type=str, help="job unique id")
parser.add_argument("--np", type=int, help="job pod/node number")
parser.add_argument("action", type=str, help="action to take")
args = parser.parse_args()
server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER')
# compatible with kuberntes service discovery
if not server and os.getenv(
'PADDLE_ELASTIC_ETCD_SERVICE_HOST') and os.getenv(
'PADDLE_ELASTIC_ETCD_SERVICE_PORT'):
server = '{}:{}'.format(
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'),
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT'))
name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID')
np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0))
cmd = Command(server, name)
if args.action == "scale":
cmd.scale_np(np)
print("action {} done".format(args.action))
cmd.close()
...@@ -40,6 +40,15 @@ class LauncherInterface(object): ...@@ -40,6 +40,15 @@ class LauncherInterface(object):
self.procs = [] self.procs = []
def _terminate_procs(self): def _terminate_procs(self):
# try to terminate process by group, this happend in multiprocess senario in user process
for p in self.procs:
if p.proc.poll() is None:
os.killpg(os.getpgid(p.proc.pid), signal.SIGTERM)
if p.log_fn:
p.log_fn.close()
logger.info("terminate process group gid:{}".format(p.proc.pid))
time.sleep(1)
for p in self.procs: for p in self.procs:
if p.proc.poll() is None: if p.proc.poll() is None:
p.proc.terminate() p.proc.terminate()
...@@ -55,7 +64,7 @@ class LauncherInterface(object): ...@@ -55,7 +64,7 @@ class LauncherInterface(object):
alive = True alive = True
if not alive: if not alive:
logger.info("terminate all the procs") logger.info("terminated all the procs")
return True return True
time.sleep(1) time.sleep(1)
...@@ -104,6 +113,14 @@ class ElasticManager(object): ...@@ -104,6 +113,14 @@ class ElasticManager(object):
self.elastic_level = int( self.elastic_level = int(
os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1))
# compatible with kuberntes service discovery
if not server and os.getenv(
'PADDLE_ELASTIC_ETCD_SERVICE_HOST') and os.getenv(
'PADDLE_ELASTIC_ETCD_SERVICE_PORT'):
server = '{}:{}'.format(
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'),
os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT'))
#elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1) #elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1)
logger.debug('init with server {} host {}'.format(server, host)) logger.debug('init with server {} host {}'.format(server, host))
...@@ -153,9 +170,7 @@ class ElasticManager(object): ...@@ -153,9 +170,7 @@ class ElasticManager(object):
def host_call_back(event): def host_call_back(event):
if self.etcd.get(self.host_path)[0] == None: if self.etcd.get(self.host_path)[0] == None:
# ensure unmatch trigger
logger.info('register host again {}'.format(self.host)) logger.info('register host again {}'.format(self.host))
time.sleep(5)
self.etcd.put(self.host_path, six.b(self.host)) self.etcd.put(self.host_path, six.b(self.host))
...@@ -319,7 +334,7 @@ class ElasticManager(object): ...@@ -319,7 +334,7 @@ class ElasticManager(object):
self.launcher.stop() self.launcher.stop()
return ElasticStatus.HOLD return ElasticStatus.HOLD
time.sleep(3) time.sleep(2)
if self.launcher: if self.launcher:
self.launcher.stop() self.launcher.stop()
......
...@@ -525,9 +525,14 @@ def start_local_trainers(cluster, ...@@ -525,9 +525,14 @@ def start_local_trainers(cluster,
f.write("PADDLE_TRAINER_ENDPOINTS: \n") f.write("PADDLE_TRAINER_ENDPOINTS: \n")
f.write("\n".join(cluster.trainers_endpoints())) f.write("\n".join(cluster.trainers_endpoints()))
fn = open("%s/workerlog.%d" % (log_dir, idx), "a") fn = open("%s/workerlog.%d" % (log_dir, idx), "a")
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
preexec_fn=os.setsid)
else: else:
proc = subprocess.Popen(cmd, env=current_env) proc = subprocess.Popen(cmd, env=current_env, preexec_fn=os.setsid)
tp = TrainerProc() tp = TrainerProc()
tp.proc = proc tp.proc = proc
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册