diff --git a/python/paddle/distributed/elastic.py b/python/paddle/distributed/elastic.py new file mode 100644 index 0000000000000000000000000000000000000000..3e4fea5e6f34d7eb7c8eb4ea8fe40a4939a81f68 --- /dev/null +++ b/python/paddle/distributed/elastic.py @@ -0,0 +1,74 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import six +import os + + +class Command(object): + def __init__(self, server, name): + import etcd3 + + srv, port = server.split(':') + self.etcd = etcd3.client(host=srv, port=port) + + self.prefix = "/paddle/" + name + self.node_prefix = self.prefix + '/nodes' + self.np_path = self.prefix + '/np' + + def set_np(self, np): + self.etcd.put(self.np_path, six.b('{}'.format(np))) + + def scale_np(self, np): + if self.etcd.get(self.np_path)[0] != None: + self.set_np(np) + return True + return False + + def close(self): + self.etcd.close() + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Elastic Command') + parser.add_argument( + "--elastic_server", type=str, help="etcd server host:port") + parser.add_argument("--job_id", type=str, help="job unique id") + parser.add_argument("--np", type=int, help="job pod/node number") + parser.add_argument("action", type=str, help="action to take") + + args = parser.parse_args() + + server = args.elastic_server or os.getenv('PADDLE_ELASTIC_SERVER') + # compatible with kuberntes service discovery + if not server and os.getenv( + 'PADDLE_ELASTIC_ETCD_SERVICE_HOST') and os.getenv( + 'PADDLE_ELASTIC_ETCD_SERVICE_PORT'): + server = '{}:{}'.format( + os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'), + os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT')) + name = args.job_id or os.getenv('PADDLE_ELASTIC_JOB_ID') + + np = args.np or int(os.getenv('PADDLE_ELASTIC_NP', 0)) + + cmd = Command(server, name) + + if args.action == "scale": + cmd.scale_np(np) + + print("action {} done".format(args.action)) + + cmd.close() diff --git a/python/paddle/distributed/fleet/elastic.py b/python/paddle/distributed/fleet/elastic.py index aa950fc26f6595392e8edb4480d5a8af7c4af550..706868918f531ff84f4291b13efa281b47f2e49d 100644 --- a/python/paddle/distributed/fleet/elastic.py +++ b/python/paddle/distributed/fleet/elastic.py @@ -40,6 +40,15 @@ class LauncherInterface(object): self.procs = [] def _terminate_procs(self): + # try to terminate process by group, this happend in multiprocess senario in user process + for p in self.procs: + if p.proc.poll() is None: + os.killpg(os.getpgid(p.proc.pid), signal.SIGTERM) + if p.log_fn: + p.log_fn.close() + logger.info("terminate process group gid:{}".format(p.proc.pid)) + + time.sleep(1) for p in self.procs: if p.proc.poll() is None: p.proc.terminate() @@ -55,7 +64,7 @@ class LauncherInterface(object): alive = True if not alive: - logger.info("terminate all the procs") + logger.info("terminated all the procs") return True time.sleep(1) @@ -104,6 +113,14 @@ class ElasticManager(object): self.elastic_level = int( os.getenv('PADDLE_ELASTIC_FAULT_TOLERANC_LEVEL', 1)) + # compatible with kuberntes service discovery + if not server and os.getenv( + 'PADDLE_ELASTIC_ETCD_SERVICE_HOST') and os.getenv( + 'PADDLE_ELASTIC_ETCD_SERVICE_PORT'): + server = '{}:{}'.format( + os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_HOST'), + os.getenv('PADDLE_ELASTIC_ETCD_SERVICE_PORT')) + #elastic_timeout = os.getenv('PADDLE_ELASTIC_TIMEOUT',1) logger.debug('init with server {} host {}'.format(server, host)) @@ -153,9 +170,7 @@ class ElasticManager(object): def host_call_back(event): if self.etcd.get(self.host_path)[0] == None: - # ensure unmatch trigger logger.info('register host again {}'.format(self.host)) - time.sleep(5) self.etcd.put(self.host_path, six.b(self.host)) @@ -319,7 +334,7 @@ class ElasticManager(object): self.launcher.stop() return ElasticStatus.HOLD - time.sleep(3) + time.sleep(2) if self.launcher: self.launcher.stop() diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index 4b1eef72ee917782e2073af9cbd1b4678169bd69..7c183fc9286c818500cf6d91f87b18aee65cd5ea 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -525,9 +525,14 @@ def start_local_trainers(cluster, f.write("PADDLE_TRAINER_ENDPOINTS: \n") f.write("\n".join(cluster.trainers_endpoints())) fn = open("%s/workerlog.%d" % (log_dir, idx), "a") - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + proc = subprocess.Popen( + cmd, + env=current_env, + stdout=fn, + stderr=fn, + preexec_fn=os.setsid) else: - proc = subprocess.Popen(cmd, env=current_env) + proc = subprocess.Popen(cmd, env=current_env, preexec_fn=os.setsid) tp = TrainerProc() tp.proc = proc