diff --git a/python/paddle/distributed/run/__init__.py b/python/paddle/distributed/launch/__init__.py similarity index 55% rename from python/paddle/distributed/run/__init__.py rename to python/paddle/distributed/launch/__init__.py index f25ddb794cc4d573429ac960e646bd8125c48d16..f39bb76114345d7144e1f5b005eb42792a84cf14 100644 --- a/python/paddle/distributed/run/__init__.py +++ b/python/paddle/distributed/launch/__init__.py @@ -12,74 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .job.container import Container -from .job.pod import Pod -from .job.job import Job -from . import plugins - -#__all__ = [Container, Pod, Job] +__all__ = [] ''' -Paddle distribution training entry ``python -m paddle.distributed.run``. +Paddle distributed training entry ``python -m paddle.distributed.launch``. Help # for arg usage and explanation, try the following command -# python -m paddle.distributed.run -h +# python -m paddle.distributed.launch -h Collective Mode Case 1: 1 node use all visible devices -# python -m paddle.distributed.run train.py +# python -m paddle.distributed.launch train.py use specified devices -# python -m paddle.distributed.run --devices=0,1,2,3 train.py +# python -m paddle.distributed.launch --devices=0,1,2,3 train.py Case 2: multi-node, auto detect ip/port -# python -m paddle.distributed.run --np 2 train.py +# python -m paddle.distributed.launch --nnodes 2 train.py # auto print following command -# python -m paddle.distributed.run --master 10.0.0.1:13538 --np 2 demo.py +# python -m paddle.distributed.launch --master 10.0.0.1:13538 --nnodes 2 demo.py # then copy and paste above command to other nodes Case 3: multi-node, specified master/rendezvous server -# python -m paddle.distributed.run --np 2 --master 10.0.0.1:2379 train.py +# python -m paddle.distributed.launch --nnodes 2 --master 10.0.0.1:2379 train.py # the master ip must be one of the node and the port must available Parameter Server Mode Case 1.1: 1 node, 1 ps, 1 trainer -# python -m paddle.distributed.run --mode ps train.py -# python -m paddle.distributed.run --server_num=1 --trainer_num=1 train.py +# python -m paddle.distributed.launch --mode ps train.py +# python -m paddle.distributed.launch --server_num=1 --trainer_num=1 train.py Case 1.2: 1 node, 2 ps, 2 trainer -# python -m paddle.distributed.run --server_num=2 --trainer_num=2 train.py +# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 train.py Case 2: 2 node, 2 ps, 2 trainer per node -# python -m paddle.distributed.run --server_num=2 --trainer_num=2 --np 2 train.py +# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 --nnodes 2 train.py # auto print following command -# python -m paddle.distributed.run --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --np 2 train.py +# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py # then copy and paste above command to other nodes Case 3: multi-node, specified master/rendezvous server -# python -m paddle.distributed.run --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --np 2 train.py +# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py # the master ip must be one of the node and the port must available Case 4: specified servers and trainers in each node -python -m paddle.distributed.run --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py +python -m paddle.distributed.launch --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py Elastic Mode # run following command in 3 node to run immediately, or in 2 node to run after elastic_timeout -# python -m paddle.distributed.run --master etcd://10.0.0.1:2379 --np 2:3 train.py +# python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2:3 train.py # once the peer number changes between 2:3, the strategy holds diff --git a/python/paddle/distributed/run/__main__.py b/python/paddle/distributed/launch/__main__.py similarity index 61% rename from python/paddle/distributed/run/__main__.py rename to python/paddle/distributed/launch/__main__.py index e32df59a328081e33aa86b42ed9b8e489ac399e8..9cd6f4408c9897d5eaeaa0fd31602c4a0f6de09f 100644 --- a/python/paddle/distributed/run/__main__.py +++ b/python/paddle/distributed/launch/__main__.py @@ -15,14 +15,28 @@ from .context import Context from . import controllers -# initialize the context to run -ctx = Context() -# initialize the selected controller -c = controllers.init(ctx) +def launch(): + # initialize the context to run + ctx = Context() -# run the pods -c.run() + if ctx.is_legacy_mode(): -# manager or just wait pod -c.finalize() + # legacy mode + from paddle.distributed.fleet import launch + launch.launch() + + else: + + # initialize the selected controller + c = controllers.init(ctx) + + # run the pods + c.run() + + # manager or just wait pod + c.finalize() + + +if __name__ == "__main__": + launch() diff --git a/python/paddle/distributed/launch/context/__init__.py b/python/paddle/distributed/launch/context/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e13bb2a5f0ba720037e488e92d1777a9a9111b68 --- /dev/null +++ b/python/paddle/distributed/launch/context/__init__.py @@ -0,0 +1,88 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.distributed.launch import plugins + +from .node import Node +from .status import Status +from .args_envs import parse_args, fetch_envs, env_args_mapping + +import logging + + +class Context(object): + def __init__(self, enable_plugin=True): + self.args, self.unknown_args = parse_args() + self.envs = fetch_envs() + self.logger = self.get_logger() + + self.node = Node() + self.status = Status() + + self.set_env_in_args() + + # design for event queue, later + self.events = [] + + if enable_plugin: + self._enable_plugin() + + def is_legacy_mode(self): + if self.args.legacy: + return True + + if len(self.unknown_args) > 0: + self.logger.warning("Compatible mode enable with args {}".format( + self.unknown_args)) + return True + + legacy_env_list = [ + 'DISTRIBUTED_TRAINER_ENDPOINTS', + 'PADDLE_ELASTIC_JOB_ID', + 'PADDLE_DISTRI_BACKEND', + 'FLAGS_START_PORT', + ] + + for env in legacy_env_list: + if env in self.envs: + self.logger.warning( + "ENV {} is deprecated, legacy launch enable".format(env)) + return True + + if self.args.master: + return False + + return False + + def get_envs(self): + return self.envs.copy() + + def _enable_plugin(self): + for pl in plugins.enabled_plugins: + pl(self) + + def get_logger(self, level=logging.INFO): + logger = logging.getLogger("LAUNCH") + logger.setLevel(self.args.log_level.upper() or level) + formatter = logging.Formatter( + fmt='%(name)s %(levelname)s %(asctime)s %(message)s') + ch = logging.StreamHandler() + ch.setFormatter(formatter) + logger.addHandler(ch) + return logger + + def set_env_in_args(self): + for k, v in env_args_mapping.items(): + if k in self.envs: + setattr(self.args, v, self.envs[k]) diff --git a/python/paddle/distributed/launch/context/args_envs.py b/python/paddle/distributed/launch/context/args_envs.py new file mode 100644 index 0000000000000000000000000000000000000000..d504a11e5f3d1cfc06c73cbb4b723e8758de52ce --- /dev/null +++ b/python/paddle/distributed/launch/context/args_envs.py @@ -0,0 +1,151 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from argparse import ArgumentParser, REMAINDER + +env_args_mapping = { + 'POD_IP': 'host', + 'PADDLE_MASTER': 'master', + 'PADDLE_DEVICES': 'devices', + 'PADDLE_NNODES': 'nnodes', + 'PADDLE_MODE': 'mode', + 'PADDLE_LOG_LEVEL': 'log_level', + 'PADDLE_NPROC_PER_NODE': 'nproc_per_node', + 'PADDLE_JOB_ID': 'job_id', + 'PADDLE_RANK': 'rank', + 'PADDLE_LOG_DIR': 'log_dir', + 'PADDLE_MAX_RESTART': 'max_restart', + 'PADDLE_ELASTIC_LEVEL': 'elastic_level', + 'PADDLE_ELASTIC_TIMEOUT': 'elastic_timeout', + 'PADDLE_SERVER_NUM': 'server_num', + 'PADDLE_TRAINER_NUM': 'trainer_num', + 'PADDLE_SERVERS_ENDPOINTS': 'servers', + 'PADDLE_TRAINERS_ENDPOINTS': 'trainers', + 'PADDLE_GLOO_PORT': 'gloo_port', + 'PADDLE_WITH_GLOO': 'with_gloo', +} + + +def fetch_envs(): + os.environ.pop('http_proxy', None) + os.environ.pop('https_proxy', None) + + return os.environ.copy() + + +def parse_args(): + parser = ArgumentParser() + + base_group = parser.add_argument_group("Base Parameters") + + base_group.add_argument( + "--master", + type=str, + default=None, + help="the master/rendezvous server, ip:port") + + base_group.add_argument( + "--legacy", type=bool, default=False, help="use legacy launch") + + base_group.add_argument( + "--rank", type=int, default=-1, help="the peer rank") + + base_group.add_argument( + "--log_level", type=str, default="INFO", help="log level. Default INFO") + + base_group.add_argument( + "--nnodes", + type=str, + default="1", + help="the number of peers, i.e. pod/node number") + + base_group.add_argument( + "--nproc_per_node", + type=int, + default=None, + help="the number of processes in a pod") + + base_group.add_argument( + "--log_dir", + type=str, + default="log", + help="the path for each process's log. Default ./log") + base_group.add_argument( + "--mode", + type=str, + default="collective", + help="run mode of the job, collective/ps/ps-heter") + + base_group.add_argument( + "--job_id", + type=str, + default="default", + help="unique id of the job. Default default") + + base_group.add_argument( + "--devices", + type=str, + default=None, + help="accelerate devices. as --gpus,npus,xps") + + base_group.add_argument("--host", type=str, default=None, help="host ip") + + base_group.add_argument( + "training_script", + type=str, + help="the full path of py script," + "followed by arguments for the " + "training script") + + base_group.add_argument('training_script_args', nargs=REMAINDER) + + ps_group = parser.add_argument_group("Parameter-Server Parameters") + # for parameter server + ps_group.add_argument( + "--servers", type=str, default='', help="servers endpoints full list") + ps_group.add_argument( + "--trainers", type=str, default='', help="trainers endpoints full list") + + ps_group.add_argument( + "--trainer_num", type=int, default=None, help="number of trainers") + ps_group.add_argument( + "--server_num", type=int, default=None, help="number of servers") + ps_group.add_argument( + "--gloo_port", type=int, default=6767, help="gloo http port") + ps_group.add_argument( + "--with_gloo", type=str, default="0", help="use gloo or not") + + # parameter elastic mode + elastic_group = parser.add_argument_group("Elastic Parameters") + elastic_group.add_argument( + "--max_restart", + type=int, + default=3, + help="the times can restart. Default 3") + + elastic_group.add_argument( + "--elastic_level", + type=int, + default=-1, + help="elastic level: -1 disable, 0 failed exit, peers hold, 1 internal restart" + ) + + elastic_group.add_argument( + "--elastic_timeout", + type=int, + default=30, + help="seconds to wait before elastic perform training") + + return parser.parse_known_args() diff --git a/python/paddle/distributed/launch/context/device.py b/python/paddle/distributed/launch/context/device.py new file mode 100644 index 0000000000000000000000000000000000000000..9163e7abd918371ddf4eca388bc912b630684f1f --- /dev/null +++ b/python/paddle/distributed/launch/context/device.py @@ -0,0 +1,149 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + + +class DeviceType: + CPU = 'cpu' + GPU = 'gpu' + XPU = 'xpu' + NPU = 'npu' + MLU = 'mlu' + + +class Device(object): + def __init__(self, dtype=None, memory="", labels=""): + self._dtype = dtype + self._memory = memory + self._labels = labels + + def __str__(self): + return ",".join(self._labels) + + @property + def dtype(self): + return self._dtype + + @property + def count(self): + return len(self._labels) or 1 + + @property + def memory(self): + return self._memory + + @property + def labels(self): + return self._labels + + @labels.setter + def labels(self, lbs): + if isinstance(lbs, str): + self._labels = lbs.split(',') + elif isinstance(lbs, list): + self._labels = lbs + else: + self._labels = [] + + def get_selected_flag_key(self): + if self._dtype == DeviceType.CPU: + return 'FLAGS_selected_cpus' + if self._dtype == DeviceType.GPU: + return 'FLAGS_selected_gpus' + if self._dtype == DeviceType.NPU: + return 'FLAGS_selected_npus' + if self._dtype == DeviceType.XPU: + return 'FLAGS_selected_xpus' + if self._dtype == DeviceType.MLU: + return 'FLAGS_selected_mlus' + return 'FLAGS_selected_devices' + + def get_selected_flag_label(self, idx): + if idx < len(self._labels): + return self._labels[idx] + else: + return '0' + + def selected_flags(self, idx=None): + if idx is None: + return {self.get_selected_flag_key(): ','.join(self._labels)} + else: + return { + self.get_selected_flag_key(): self.get_selected_flag_label(idx) + } + + @classmethod + def parse_device(self): + dev = Device() + visible_devices = None + if 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ: + dev._dtype = DeviceType.GPU + visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( + "NVIDIA_VISIBLE_DEVICES") + elif 'XPU_VISIBLE_DEVICES' in os.environ: + dev._dtype = DeviceType.XPU + visible_devices = os.getenv("XPU_VISIBLE_DEVICES") + elif 'ASCEND_VISIBLE_DEVICES' in os.environ: + dev._dtype = DeviceType.NPU + visible_devices = os.getenv("ASCEND_VISIBLE_DEVICES") + elif 'MLU_VISIBLE_DEVICES' in os.environ: + dev._dtype = DeviceType.MLU + visible_devices = os.getenv("MLU_VISIBLE_DEVICES") + + if visible_devices is not None and visible_devices != 'all': + dev._labels = visible_devices.split(',') + else: + return self.detect_device() + + return dev + + @classmethod + def detect_device(self): + import paddle.fluid as fluid + + dev = Device() + num = 0 + visible_devices = None + if fluid.core.is_compiled_with_cuda(): + dev._dtype = DeviceType.GPU + num = fluid.core.get_cuda_device_count() + visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( + "NVIDIA_VISIBLE_DEVICES") + elif fluid.core.is_compiled_with_xpu(): + dev._dtype = DeviceType.XPU + num = fluid.core.get_xpu_device_count() + visible_devices = os.getenv("XPU_VISIBLE_DEVICES") + elif fluid.core.is_compiled_with_npu(): + dev._dtype = DeviceType.NPU + num = fluid.core.get_npu_device_count() + visible_devices = os.getenv("ASCEND_VISIBLE_DEVICES") + elif fluid.core.is_compiled_with_mlu(): + dev._dtype = DeviceType.MLU + num = fluid.core.get_mlu_device_count() + visible_devices = os.getenv("MLU_VISIBLE_DEVICES") + + if num == 0: + dev._dtype = DeviceType.CPU + elif visible_devices is None or visible_devices == "all": + dev._labels = [str(x) for x in range(0, num)] + else: + dev._labels = visible_devices.split(',') + + return dev + + +if __name__ == '__main__': + d = Device.parse_device() + print(d.get_selected_flag()) diff --git a/python/paddle/distributed/run/context/event.py b/python/paddle/distributed/launch/context/event.py similarity index 100% rename from python/paddle/distributed/run/context/event.py rename to python/paddle/distributed/launch/context/event.py diff --git a/python/paddle/distributed/run/context/node.py b/python/paddle/distributed/launch/context/node.py similarity index 100% rename from python/paddle/distributed/run/context/node.py rename to python/paddle/distributed/launch/context/node.py diff --git a/python/paddle/distributed/run/context/resource.py b/python/paddle/distributed/launch/context/resource.py similarity index 100% rename from python/paddle/distributed/run/context/resource.py rename to python/paddle/distributed/launch/context/resource.py diff --git a/python/paddle/distributed/run/context/status.py b/python/paddle/distributed/launch/context/status.py similarity index 100% rename from python/paddle/distributed/run/context/status.py rename to python/paddle/distributed/launch/context/status.py diff --git a/python/paddle/distributed/run/controllers/__init__.py b/python/paddle/distributed/launch/controllers/__init__.py similarity index 98% rename from python/paddle/distributed/run/controllers/__init__.py rename to python/paddle/distributed/launch/controllers/__init__.py index e5557151ad5489cb4af0c34b3ad47c31774b3326..706131300f0d884b216071a6ea3fe75caaf714df 100644 --- a/python/paddle/distributed/run/controllers/__init__.py +++ b/python/paddle/distributed/launch/controllers/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ["init"] +__all__ = [] from .collective import CollectiveController from .collective import CollectiveElasticController diff --git a/python/paddle/distributed/run/controllers/collective.py b/python/paddle/distributed/launch/controllers/collective.py similarity index 95% rename from python/paddle/distributed/run/controllers/collective.py rename to python/paddle/distributed/launch/controllers/collective.py index c4feb54428a07265693c0969e6e385a380e22f3d..c3fa4e6e07de9bca8acd8db585be679e3cf0244c 100644 --- a/python/paddle/distributed/run/controllers/collective.py +++ b/python/paddle/distributed/launch/controllers/collective.py @@ -89,6 +89,10 @@ class CollectiveController(Controller): "PADDLE_TRAINERS_NUM": "{}".format(global_size), "PADDLE_RANK_IN_NODE": str(i), } + if self.pod.replicas == 1: + e.update(self.ctx.node.device.selected_flags()) + else: + e.update(self.ctx.node.device.selected_flags(i)) self.add_container(envs=e, log_tag=i) return True @@ -106,7 +110,8 @@ class CollectiveElasticController(CollectiveController): def register(self): if self.job.id == 'default': self.ctx.logger.warning( - 'Using default job name may cause conflict, add --id in args') + 'Using default job name may cause conflict, add --job_id in args' + ) self.master.register_heartbeat(self.job.id, self.pod.name) @@ -114,6 +119,8 @@ class CollectiveElasticController(CollectiveController): ''' watch self and peer status, return true to exit ''' + + self.ctx.logger.info("Watching {}".format(self.pod)) while not self.ctx.status.is_done(): # self status status = self.pod.watch(timeout=2) @@ -171,13 +178,8 @@ class CollectiveElasticController(CollectiveController): continue self.master.set_status(self.ctx.status.RUNNING) - self.ctx.status.run() - - assert len(self.pod.containers) > 0, "No container in the pod" - self.ctx.logger.debug("Run {}".format(self.pod)) - self.ctx.logger.debug("Run {}".format(self.pod.containers[0])) - self.pod.deploy() + self.deploy_pod() if self.watch(): break diff --git a/python/paddle/distributed/run/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py similarity index 90% rename from python/paddle/distributed/run/controllers/controller.py rename to python/paddle/distributed/launch/controllers/controller.py index 2d904cf2a2cca5b9abaab06d1545c03c160e3d93..60e34b85a12bc46636f7edb9bbb4926eddfc73ba 100644 --- a/python/paddle/distributed/run/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -16,9 +16,9 @@ import sys import os import signal -from paddle.distributed.run.job import Job -from paddle.distributed.run.job import Pod -from paddle.distributed.run.job import Container +from paddle.distributed.launch.job.job import Job +from paddle.distributed.launch.job.pod import Pod +from paddle.distributed.launch.job.container import Container from .master import Master @@ -39,38 +39,43 @@ class ControllerBase(object): self.ctx = ctx self.master = Master.factory(self.ctx) - self.job = Job(np=self.ctx.args.np, + self.job = Job(nnodes=self.ctx.args.nnodes, mode=self.ctx.args.mode, - id=self.ctx.args.id) + jid=self.ctx.args.job_id) self.pod = Pod() self.join_server = None - def run(self): - self.build_job() - self.build_pod() + def deploy_pod(self): - if len(self.pod.containers) < 1: - self.ctx.logger.error("No container in the pod {}".format(self.pod)) - return + assert len(self.pod.containers) > 0, "No container in the pod" self.ctx.logger.info("Run {}".format(self.pod)) self.ctx.logger.debug(self.pod.containers[0]) + self.ctx.status.run() self.pod.deploy() + def run(self): + self.build_job() + self.build_pod() + + self.deploy_pod() + self.watch() def watch(self) -> bool: + self.ctx.logger.info("Watching {}".format(self.pod)) + status = self.pod.watch() if status == self.ctx.status.COMPLETED: self.ctx.logger.info("Pod {}".format(status)) elif status == self.ctx.status.FAILED: + fc = self.pod.failed_container() self.ctx.logger.info("Pod {}".format(status)) - self.ctx.logger.error("Container failed !!!\n{}".format( - self.pod.failed_container())) - self.pod.tail() + self.ctx.logger.error("Container failed !!!\n{}".format(fc[0])) + fc[0].tail() self.pod.stop() def stop(self, sigint=None): diff --git a/python/paddle/distributed/run/controllers/master.py b/python/paddle/distributed/launch/controllers/master.py similarity index 96% rename from python/paddle/distributed/run/controllers/master.py rename to python/paddle/distributed/launch/controllers/master.py index 257ba3bad8da3c331ac303b7a3ee415461fd13b8..f9f484eb125eed264cb7e8658476255281a048cf 100644 --- a/python/paddle/distributed/run/controllers/master.py +++ b/python/paddle/distributed/launch/controllers/master.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from paddle.distributed.run.utils.kv_client import KVClient -from paddle.distributed.run.utils.kv_server import KVServer +from paddle.distributed.launch.utils.kv_client import KVClient +from paddle.distributed.launch.utils.kv_server import KVServer import time import sys @@ -84,7 +84,7 @@ class HTTPMaster(Master): print("Copy the following command to other nodes to run.") cmd = [ - sys.executable.split('/')[-1], "-m", "paddle.distributed.run" + sys.executable.split('/')[-1], "-m", "paddle.distributed.launch" ] cmd.extend(["--master", self.endpoint]) cmd.extend(sys.argv[1:]) @@ -118,9 +118,12 @@ class HTTPMaster(Master): self._stop_server() def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int): + if size < 2: return [value], 0 + self.ctx.logger.info("Waiting peer ready...") + self.lazy_init() while not self.ctx.status.is_done(): @@ -130,7 +133,7 @@ class HTTPMaster(Master): self.ctx.logger.warning("master not ready") time.sleep(0.1) - # 'aaaaaa' make suer main pod (master server) as rank 0 + # 'aaaaaa' make sure main pod (master server) as rank 0 ky = 'aaaaaa' if rank < 0 and self.role == Master.MAIN else key k = "{}/{}/{}".format(prefix, ky, rank) @@ -177,6 +180,12 @@ class ETCDMaster(Master): sync_peers gather all value for key under scope prefix result always be sorted either by rank or alphabet of pod.name ''' + + if size < 2: + return [value], 0 + + self.ctx.logger.info("Waiting peer ready...") + path = "{}/{}/{}".format(prefix, key, rank) self.client.delete_prefix(prefix) diff --git a/python/paddle/distributed/run/controllers/ps.py b/python/paddle/distributed/launch/controllers/ps.py similarity index 98% rename from python/paddle/distributed/run/controllers/ps.py rename to python/paddle/distributed/launch/controllers/ps.py index cc43c336cf1862fe075e5a4463b1f5b666a5005c..d3d0ef59bfd2f8102fe92beecef100c2079b1698 100644 --- a/python/paddle/distributed/run/controllers/ps.py +++ b/python/paddle/distributed/launch/controllers/ps.py @@ -22,7 +22,8 @@ class PSController(Controller): @classmethod def enable(cls, ctx): if ctx.args.mode == ControleMode.PS or ctx.args.server_num or len( - ctx.args.servers) > 0: + ctx.args.servers) > 0 or ctx.args.trainer_num or len( + ctx.args.trainers) > 0: ctx.logger.debug("{} enabled".format(cls.__name__)) ctx.args.mode = ControleMode.PS return True diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch/job/__init__.py similarity index 78% rename from python/paddle/distributed/launch.py rename to python/paddle/distributed/launch/job/__init__.py index e02a439025b77f9ae612aa790bc521b521fb481f..97043fd7ba6885aac81cad5a49924c23c67d4d47 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch/job/__init__.py @@ -1,18 +1,13 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from paddle.distributed.fleet import launch -launch.launch() - -__all__ = [] diff --git a/python/paddle/distributed/run/job/container.py b/python/paddle/distributed/launch/job/container.py similarity index 90% rename from python/paddle/distributed/run/job/container.py rename to python/paddle/distributed/launch/job/container.py index 651932d6c88378034d7ab9cb05bac00ee3ea7ddf..7105cae9024f25f4b5d63a8d74d5d492777d4046 100644 --- a/python/paddle/distributed/run/job/container.py +++ b/python/paddle/distributed/launch/job/container.py @@ -13,12 +13,11 @@ # limitations under the License. from collections import OrderedDict -from paddle.distributed.run.utils.process_context import ProcessContext +from paddle.distributed.launch.utils.process_context import ProcessContext from .status import Status import os, copy, sys -import time class Container(object): @@ -78,6 +77,11 @@ class Container(object): kwargs = {k: v for k, v in kwargs.items() if isinstance(v, str)} self._env.update(kwargs) + def _valide_env(self): + for k, v in self._env.items(): + assert isinstance(k, str) and isinstance( + v, str), 'env {}:{} must be str'.format(k, v) + def _get_fd(self, pth): if not pth: return None @@ -90,12 +94,12 @@ class Container(object): except: return None - def start(self, timeout=-1): - end = time.time() + timeout - + def start(self): if self._proc and self._proc.alive(): return True + self._valide_env() + self._stdout = self._get_fd(self._out) or sys.stdout if self._out == self._err: self._stderr = self._stdout @@ -106,14 +110,6 @@ class Container(object): self._entrypoint, env=self._env, out=self._stdout, err=self._stderr) self._proc.start() - while timeout > 0 and time.time() < end: - if self._proc.alive(): - time.sleep(0.1) - continue - if self._proc.exit_code() == 0: - return True - return False - def terminate(self, force=False): if self._log_handler: self._log_handler.close() @@ -125,9 +121,11 @@ class Container(object): def wait(self, timeout=None): self._proc.wait(timeout) + @property def exit_code(self): return self._proc.exit_code() if self._proc else -1 + @property def status(self): if not self._proc: return Status.UNINIT @@ -141,9 +139,9 @@ class Container(object): def __str__(self): return 'Container rank {} status {} cmd {} code {} log {} \nenv {}'.format( self._rank, - self.status(), + self.status, self._entrypoint, - self.exit_code(), + self.exit_code, self.errfile, self._env, ) diff --git a/python/paddle/distributed/run/job/job.py b/python/paddle/distributed/launch/job/job.py similarity index 89% rename from python/paddle/distributed/run/job/job.py rename to python/paddle/distributed/launch/job/job.py index 3469ed862576faed3bd7546710927f638b8fe0d5..31827968ddce605b424e53f5609ff302e5e4ac95 100644 --- a/python/paddle/distributed/run/job/job.py +++ b/python/paddle/distributed/launch/job/job.py @@ -20,16 +20,16 @@ class JobMode: class Job(object): - def __init__(self, id='default', mode=JobMode.COLLECTIVE, np="1"): + def __init__(self, jid='default', mode=JobMode.COLLECTIVE, nnodes="1"): self._mode = mode - self._id = id + self._id = jid self._replicas = 0 self._replicas_min = self._replicas self._replicas_max = self._replicas self._elastic = False - self.set_replicas(str(np)) + self.set_replicas(str(nnodes)) def __str__(self): return "Job: {}, mode {}, replicas {}[{}:{}], elastic {}".format( @@ -64,8 +64,8 @@ class Job(object): def replicas(self, replicas): self._replicas = replicas - def set_replicas(self, np: str): - np = str(np) if np else '1' + def set_replicas(self, nnodes: str): + np = str(nnodes) if nnodes else '1' if ':' in np: nps = np.split(':') diff --git a/python/paddle/distributed/run/job/pod.py b/python/paddle/distributed/launch/job/pod.py similarity index 78% rename from python/paddle/distributed/run/job/pod.py rename to python/paddle/distributed/launch/job/pod.py index f7c31edce1d552befac3a6f54e5e79c326b31c67..701adf45f94e880adef6108b6f7e77ea58155a9e 100644 --- a/python/paddle/distributed/run/job/pod.py +++ b/python/paddle/distributed/launch/job/pod.py @@ -34,7 +34,7 @@ class PodSepc(object): #self.status: Status = None self._rank = -1 - self._init_timeout = 120 # 2 min timeout for each init container + self._init_timeout = None self._restart = -1 self._replicas = 0 # number of containers self._exit_code = 0 @@ -45,15 +45,15 @@ class Pod(PodSepc): super().__init__() def __str__(self): - return "Pod: {}, replicas {}, status {}".format(self.name, - self.replicas, - self.status()) + return "Pod: {}, replicas {}, status {}".format( + self.name, self.replicas, self.status) def failed_container(self): + cs = [] for c in self._containers: - if c.status() == Status.FAILED: - return c - return None + if c.status == Status.FAILED: + cs.append(c) + return cs @property def name(self): @@ -65,7 +65,7 @@ class Pod(PodSepc): @replicas.setter def replicas(self, r): - self._replicas = r + self._replicas = max(r, 1) @property def rank(self): @@ -98,13 +98,15 @@ class Pod(PodSepc): @property def exit_code(self): for c in self._containers: - if c.exit_code() != 0: - return c.exit_code() + if c.exit_code != 0: + return c.exit_code return 0 def deploy(self): + # init container should stop before run containers for i in self._init_containers: - i.start(self._init_timeout) + i.start() + i.wait(self._init_timeout) for c in self._containers: c.start() @@ -120,6 +122,7 @@ class Pod(PodSepc): for c in self._containers: c.wait(None) + @property def status(self): if self.is_failed(): return Status.FAILED @@ -127,6 +130,9 @@ class Pod(PodSepc): if self.is_completed(): return Status.COMPLETED + if self.is_running(): + return Status.RUNNING + return Status.READY def reset(self): @@ -135,31 +141,31 @@ class Pod(PodSepc): def is_failed(self): for c in self._containers: - if c.status() == Status.FAILED: + if c.status == Status.FAILED: return True return False def is_completed(self): for c in self._containers: - if c.status() != Status.COMPLETED: + if c.status != Status.COMPLETED: + return False + return True + + def is_running(self): + for c in self._containers: + if c.status != Status.RUNNING: return False return True def logs(self, idx=None): if idx is None: - if self.failed_container(): - self.failed_container().logs() - else: - self._containers[0].logs() + self._containers[0].logs() else: self._containers[idx].logs() def tail(self, idx=None): if idx is None: - if self.failed_container(): - self.failed_container().tail() - else: - self._containers[0].tail() + self._containers[0].tail() else: self._containers[idx].tail() @@ -175,10 +181,10 @@ class Pod(PodSepc): end = time.time() + timeout while timeout < 0 or time.time() < end: for c in self._containers: - if c.status() in any_list: - return c.status() + if c.status in any_list: + return c.status - s = [c.status() for c in self._containers] + s = [c.status for c in self._containers] if len(set(s)) == 1 and s[0] in all_list: return s[0] diff --git a/python/paddle/distributed/run/job/status.py b/python/paddle/distributed/launch/job/status.py similarity index 100% rename from python/paddle/distributed/run/job/status.py rename to python/paddle/distributed/launch/job/status.py diff --git a/python/paddle/distributed/run/plugins/__init__.py b/python/paddle/distributed/launch/plugins/__init__.py similarity index 71% rename from python/paddle/distributed/run/plugins/__init__.py rename to python/paddle/distributed/launch/plugins/__init__.py index ec91402a7aad359c9860cf737a78cb7c1f1375d1..1862f75a77f65d39715e031b0ba72ebea6ab5523 100644 --- a/python/paddle/distributed/run/plugins/__init__.py +++ b/python/paddle/distributed/launch/plugins/__init__.py @@ -30,15 +30,26 @@ def process_args(ctx): argdev = ctx.args.devices if argdev: ctx.node.device.labels = argdev.split(',') - ctx.node.device.count = len(ctx.node.device.labels) ctx.logger.debug('Device reset by args {}'.format(argdev)) def collective_compatible(ctx): if 'PADDLE_TRAINER_ENDPOINTS' in ctx.envs: - ctx.master = ctx.envs['PADDLE_TRAINER_ENDPOINTS'].split(',')[0] + eps = ctx.envs['PADDLE_TRAINER_ENDPOINTS'].split(',') + hosts = set([h.split(':')[0] for h in eps]) + ctx.args.master = eps[0] if ':' in eps[0] else '{}:6768'.format(eps[0]) + ctx.args.nnodes = len(hosts) + ctx.logger.info('args reset by env PADDLE_TRAINER_ENDPOINTS\n{}'.format( + eps)) + ''' if 'DISTRIBUTED_TRAINER_ENDPOINTS' in ctx.envs: - ctx.master = ctx.envs['DISTRIBUTED_TRAINER_ENDPOINTS'].split(',')[0] + eps = ctx.envs['DISTRIBUTED_TRAINER_ENDPOINTS'].split(',') + hosts = set([h.split(':')[0] for h in eps]) + ctx.args.master = eps[0] + ctx.args.nnodes = len(hosts) + ctx.logger.info( + 'args reset by env DISTRIBUTED_TRAINER_ENDPOINTS\n{}'.format(eps)) + ''' def rewrite_host_ip(ctx): diff --git a/python/paddle/distributed/run/job/__init__.py b/python/paddle/distributed/launch/utils/__init__.py similarity index 78% rename from python/paddle/distributed/run/job/__init__.py rename to python/paddle/distributed/launch/utils/__init__.py index 66d2abbce21ebc72cda5373a3d3a242c077beaa8..97043fd7ba6885aac81cad5a49924c23c67d4d47 100644 --- a/python/paddle/distributed/run/job/__init__.py +++ b/python/paddle/distributed/launch/utils/__init__.py @@ -11,15 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .pod import Pod -from .job import Job -from .container import Container -from .status import Status - -__all__ = [ - 'Pod', - 'Job', - 'Container', - 'Status', -] diff --git a/python/paddle/distributed/run/utils/kv_client.py b/python/paddle/distributed/launch/utils/kv_client.py similarity index 100% rename from python/paddle/distributed/run/utils/kv_client.py rename to python/paddle/distributed/launch/utils/kv_client.py diff --git a/python/paddle/distributed/run/utils/kv_server.py b/python/paddle/distributed/launch/utils/kv_server.py similarity index 100% rename from python/paddle/distributed/run/utils/kv_server.py rename to python/paddle/distributed/launch/utils/kv_server.py diff --git a/python/paddle/distributed/run/utils/process_context.py b/python/paddle/distributed/launch/utils/process_context.py similarity index 100% rename from python/paddle/distributed/run/utils/process_context.py rename to python/paddle/distributed/launch/utils/process_context.py diff --git a/python/paddle/distributed/run/context/__init__.py b/python/paddle/distributed/run/context/__init__.py deleted file mode 100644 index 86dff0f1f8056e784268a6ef3a3ebabb44aa9c6d..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/run/context/__init__.py +++ /dev/null @@ -1,219 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from argparse import ArgumentParser, REMAINDER -import os, copy - -from paddle.distributed.run import plugins - -from .node import Node -from .status import Status - -import logging - - -class Context(object): - def __init__(self, enable_plugin=True): - os.environ.pop('http_proxy', None) - os.environ.pop('https_proxy', None) - - self.args = self.parse_args() - self.envs = self.fetch_envs() - self.logger = self.get_logger() - - self.node = Node() - self.status = Status() - - self.set_env_in_args() - - # design for event queue, later - self.events = [] - - if enable_plugin: - self._enable_plugin() - - def get_envs(self): - return self.envs.copy() - - def _enable_plugin(self): - for pl in plugins.enabled_plugins: - pl(self) - - def parse_args(self): - parser = ArgumentParser() - - base_group = parser.add_argument_group("Base Parameters") - - base_group.add_argument( - "--master", - type=str, - default=None, - help="the master/rendezvous server, ip:port") - - base_group.add_argument( - "--rank", type=int, default=-1, help="the peer rank") - - base_group.add_argument( - "--log", type=str, default="INFO", help="log level. Default INFO") - - base_group.add_argument( - "--np", - type=str, - default="1", - help="the number of peers, i.e. pod/node number") - - base_group.add_argument( - "--nproc_per_node", - type=int, - default=None, - help="the number of processes in a pod") - - base_group.add_argument( - "--log_dir", - type=str, - default="log", - help="the path for each process's log. Default ./log") - base_group.add_argument( - "--mode", - type=str, - default="collective", - help="run mode of the job, collective/ps/ps-heter") - - base_group.add_argument( - "--id", - type=str, - default="default", - help="unique id of the job. Default default") - - base_group.add_argument( - "--devices", - type=str, - default=None, - help="accelerate devices. as --gpus,npus,xps") - - base_group.add_argument( - "--host", type=str, default=None, help="host ip") - - base_group.add_argument( - "training_script", - type=str, - help="the full path of py script," - "followed by arguments for the " - "training script") - - base_group.add_argument('training_script_args', nargs=REMAINDER) - - ps_group = parser.add_argument_group("Parameter-Server Parameters") - # for parameter server - ps_group.add_argument( - "--servers", - type=str, - default='', - help="servers endpoints full list") - ps_group.add_argument( - "--trainers", - type=str, - default='', - help="trainers endpoints full list") - - ps_group.add_argument( - "--trainer_num", type=int, default=None, help="number of trainers") - ps_group.add_argument( - "--server_num", type=int, default=None, help="number of servers") - ps_group.add_argument( - "--gloo_port", type=int, default=6767, help="gloo http port") - ps_group.add_argument( - "--with_gloo", type=str, default="0", help="use gloo or not") - - # parameter elastic mode - elastic_group = parser.add_argument_group("Elastic Parameters") - elastic_group.add_argument( - "--max_restart", - type=int, - default=3, - help="the times can restart. Default 3") - - elastic_group.add_argument( - "--elastic_level", - type=int, - default=-1, - help="elastic level: -1 disable, 0 failed exit, peers hold, 1 internal restart" - ) - - elastic_group.add_argument( - "--elastic_timeout", - type=int, - default=30, - help="seconds to wait before elastic perform training") - return parser.parse_args() - - def _valide_env(self, key): - if key in ['POD_IP']: - return True - if key.endswith('_VISIBLE_DEVICES'): - return True - if key.startswith('PADDLE_'): - return True - - return False - - def fetch_envs(self): - ge = os.environ.copy() - - black_env_list = ['http_proxy', 'https_proxy'] - for key in black_env_list: - ge.pop(key, None) - - return ge - ''' - # use black list instead white list - return {k: ge[k] for k in ge if self._valide_env(k)} - ''' - - def get_logger(self, level=logging.INFO): - logger = logging.getLogger("PADDLERUN") - logger.setLevel(self.args.log.upper() or level) - formatter = logging.Formatter( - fmt='%(name)s %(levelname)s %(asctime)s %(message)s') - ch = logging.StreamHandler() - ch.setFormatter(formatter) - logger.addHandler(ch) - return logger - - def set_env_in_args(self): - env_args = { - 'POD_IP': 'host', - 'PADDLE_MASTER': 'master', - 'PADDLE_DEVICES': 'devices', - 'PADDLE_NP': 'np', - 'PADDLE_MODE': 'mode', - 'PADDLE_LOG': 'log', - 'PADDLE_NPROC_PER_NODE': 'nproc_per_node', - 'PADDLE_JOB_ID': 'id', - 'PADDLE_RANK': 'rank', - 'PADDLE_LOG_DIR': 'log_dir', - 'PADDLE_MAX_RESTlRT': 'max_restart', - 'PADDLE_ELASTIC_LEVEL': 'elastic_level', - 'PADDLE_ELASTIC_TIMEOUT': 'elastic_timeout', - 'PADDLE_SERVER_NUM': 'server_num', - 'PADDLE_TRAINER_NUM': 'trainer_num', - 'PADDLE_SERVERS_ENDPOINTS': 'servers', - 'PADDLE_TRAINERS_ENDPOINTS': 'trainers', - 'PADDLE_GLOO_PORT': 'gloo_port', - 'PADDLE_WITH_GLOO': 'with_gloo', - } - - for k, v in env_args.items(): - if k in self.envs: - setattr(self.args, v, self.envs[k]) diff --git a/python/paddle/distributed/run/context/device.py b/python/paddle/distributed/run/context/device.py deleted file mode 100644 index d8bbd851ccf83a1ebfac60758576384bbe1aa4f4..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/run/context/device.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - - -class DeviceType: - CPU = 'cpu' - GPU = 'gpu' - XPU = 'xpu' - NPU = 'npu' - - -class Device(object): - def __init__(self, dtype=None, count=1, memory="", labels=""): - self.dtype = dtype - self.count = count - self.memory = memory - self.labels = labels - - def __str__(self): - return ",".join(self.labels) - - @classmethod - def parse_device(self): - dev = Device() - visible_devices = None - if 'CUDA_VISIBLE_DEVICES' in os.environ or 'NVIDIA_VISIBLE_DEVICES' in os.environ: - dev.dtype = DeviceType.GPU - visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( - "NVIDIA_VISIBLE_DEVICES") - elif 'XPU_VISIBLE_DEVICES' in os.environ: - dev.dtype = DeviceType.XPU - visible_devices = os.getenv("XPU_VISIBLE_DEVICES") - elif 'ASCEND_VISIBLE_DEVICES' in os.environ: - dev.dtype = DeviceType.NPU - visible_devices = os.getenv("ASCEND_VISIBLE_DEVICES") - - if visible_devices and visible_devices != 'all': - dev.labels = visible_devices.split(',') - dev.count = len(dev.labels) - else: - return self.detect_device() - - return dev - - @classmethod - def detect_device(self): - import paddle.fluid as fluid - - dev = Device() - num = 0 - visible_devices = None - if fluid.core.is_compiled_with_cuda(): - dev.dtype = DeviceType.GPU - num = fluid.core.get_cuda_device_count() - visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") or os.getenv( - "NVIDIA_VISIBLE_DEVICES") - elif fluid.core.is_compiled_with_xpu(): - dev.dtype = DeviceType.XPU - num = fluid.core.get_xpu_device_count() - visible_devices = os.getenv("XPU_VISIBLE_DEVICES") - elif fluid.core.is_compiled_with_npu(): - dev.dtype = DeviceType.NPU - num = fluid.core.get_npu_device_count() - visible_devices = os.getenv("ASCEND_VISIBLE_DEVICES") - - if num == 0: - dev.dtype = DeviceType.CPU - elif visible_devices is None or visible_devices == "all" or visible_devices == "": - dev.labels = [str(x) for x in range(0, num)] - dev.count = num - else: - dev.labels = visible_devices.split(',') - dev.count = len(dev.labels) - - return dev diff --git a/python/paddle/distributed/run/plugins/ip.py b/python/paddle/distributed/run/plugins/ip.py deleted file mode 100644 index 0809ed5864da9f3bea29235621e7c29b75823391..0000000000000000000000000000000000000000 --- a/python/paddle/distributed/run/plugins/ip.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import socket - - -def get_local_ip(ctx): - _, ip = _get_host_name_ip() - ctx.args.host = ip - ctx.envs["POD_IP"] = ip - - -def _get_host_name_ip(): - try: - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - return host_name, host_ip - except: - return None diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 44e6f8e8f2a6d11371f21fff5a9dccefcd72ebed..2acf530eea3fbd2d7fbc9cf04d2c792b7175035c 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -949,7 +949,7 @@ if (WITH_DISTRIBUTE AND NOT APPLE) endif() # setting timeout value as 15S -set_tests_properties(test_run PROPERTIES TIMEOUT 200) +set_tests_properties(test_run PROPERTIES TIMEOUT 120) set_tests_properties(test_sync_batch_norm_op PROPERTIES TIMEOUT 120) set_tests_properties(test_cross_op PROPERTIES TIMEOUT 120) set_tests_properties(test_imperative_lod_tensor_to_selected_rows PROPERTIES TIMEOUT 200) diff --git a/python/paddle/fluid/tests/unittests/test_c_comm_init_op.sh b/python/paddle/fluid/tests/unittests/test_c_comm_init_op.sh index aba95a68ab790828a63b1f46afaaa87f7826d248..9b99e553d182bc5f9ce8cd1835d7600ebfb956f2 100644 --- a/python/paddle/fluid/tests/unittests/test_c_comm_init_op.sh +++ b/python/paddle/fluid/tests/unittests/test_c_comm_init_op.sh @@ -17,5 +17,4 @@ set -e # use default values # FIXME: random fails on Unknown command lines -c (or -m). -launch_py=${PADDLE_BINARY_DIR}/python/paddle/distributed/launch.py -CUDA_VISIBLE_DEVICES=0,1 python ${launch_py} c_comm_init_op.py +CUDA_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch c_comm_init_op.py diff --git a/python/paddle/fluid/tests/unittests/test_run.py b/python/paddle/fluid/tests/unittests/test_run.py index 8fe5fb9bb9455aa58d84bda03f9e9e16038a3be0..498aecf7c6e75cbb3dd1e73e4aae6151ed65f0db 100644 --- a/python/paddle/fluid/tests/unittests/test_run.py +++ b/python/paddle/fluid/tests/unittests/test_run.py @@ -39,9 +39,7 @@ import os env = os.environ.copy() assert "PADDLE_PSERVERS_IP_PORT_LIST" in env assert "PADDLE_TRAINER_ENDPOINTS" in env -#assert "PADDLE_PSERVER_ENDPOINTS" in env -#assert "PADDLE_TRAINER_ENDPOINTS" in env -#assert "PADDLE_ROLE" in env +assert "PADDLE_ROLE" in env #assert "PADDLE_RANK" in env ''' @@ -62,27 +60,24 @@ class Collective_Test(unittest.TestCase): write_file(pyname, colpyfile) def pdrun(self, args, env=None): - cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.run"] + cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"] if args: cmd.extend(args.split(" ")) cmd.extend([pyname]) proc = subprocess.Popen(cmd, env) return proc - ''' def test_collective_1(self): - args = "--id test1" + args = "--job_id test1" p = self.pdrun(args) p.wait() self.assertTrue(p.poll() == 0) - ''' - def test_collective_2(self): if os.path.exists('./log'): shutil.rmtree('./log') - args = "--id test2 --devices 0,1,2" + args = "--job_id test2 --devices 0,1,2" p = self.pdrun(args) p.wait() self.assertTrue(p.poll() == 0) @@ -95,7 +90,7 @@ class Collective_Test(unittest.TestCase): shutil.rmtree('./log') port = random.randrange(6000, 8000) - args = "--id test3 --devices 0,1 --master 127.0.0.1:{} --np 2".format( + args = "--job_id test3 --devices 0,1 --master 127.0.0.1:{} --np 2".format( port) p1 = self.pdrun(args) p2 = self.pdrun(args) @@ -113,14 +108,13 @@ class PS_Test(unittest.TestCase): write_file(pyname, pspyfile) def pdrun(self, args, env=None): - cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.run"] + cmd = [sys.executable.split('/')[-1], "-m", "paddle.distributed.launch"] if args: cmd.extend(args.split(" ")) cmd.extend([pyname]) proc = subprocess.Popen(cmd, env) return proc - ''' def test_ps_1(self): args = "--mode ps" p = self.pdrun(args) @@ -131,21 +125,20 @@ class PS_Test(unittest.TestCase): if os.path.exists('./log'): shutil.rmtree('./log') - args = "--id ps2 --server_num=2 --trainer_num=2" + args = "--job_id ps2 --server_num=2 --trainer_num=2" p = self.pdrun(args) p.wait() self.assertTrue(p.poll() == 0) c = get_files('log', 'ps2') self.assertTrue(len(c) == 5) - ''' def test_ps_3(self): if os.path.exists('./log'): shutil.rmtree('./log') port = random.randrange(6000, 8000) - args = "--id ps3 --master 127.0.0.1:{} --np 2 --server_num=1 --trainer_num=1".format( + args = "--job_id ps3 --master 127.0.0.1:{} --np 2 --server_num=1 --trainer_num=1".format( port) p1 = self.pdrun(args) p2 = self.pdrun(args) @@ -161,7 +154,7 @@ class PS_Test(unittest.TestCase): if os.path.exists('./log'): shutil.rmtree('./log') - args = "--id ps4 --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903" + args = "--job_id ps4 --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903" p1 = self.pdrun(args) p1.wait() self.assertTrue(p1.poll() == 0) diff --git a/python/setup.py.in b/python/setup.py.in index 44998bd3e1675f2a3f77edd26c9cd8fa85121b6a..0a10e9dcc698d7433ceab5dd35ee4d5fa1729636 100755 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -282,6 +282,12 @@ packages=['paddle', 'paddle.distribution', 'paddle.distributed.sharding', 'paddle.distributed.fleet', + 'paddle.distributed.launch', + 'paddle.distributed.launch.context', + 'paddle.distributed.launch.controllers', + 'paddle.distributed.launch.job', + 'paddle.distributed.launch.plugins', + 'paddle.distributed.launch.utils', 'paddle.distributed.fleet.base', 'paddle.distributed.fleet.elastic', 'paddle.distributed.fleet.meta_optimizers', @@ -727,7 +733,7 @@ with redirect_stdout(): }, entry_points={ 'console_scripts': [ - 'fleetrun = paddle.distributed.fleet.launch:launch' + 'fleetrun = paddle.distributed.launch.__main__:launch' ] }, classifiers=[