未验证 提交 fdafbc7b 编写于 作者: K kuizhiqing 提交者: GitHub

enable continuous log; update doc (#40782)

上级 b518fa2a
......@@ -13,7 +13,7 @@
# limitations under the License.
from .spawn import spawn # noqa: F401
from .fleet.launch import launch # noqa: F401
from .launch.main import launch # noqa: F401
from .parallel import init_parallel_env # noqa: F401
from .parallel import get_rank # noqa: F401
......
......@@ -13,69 +13,3 @@
# limitations under the License.
__all__ = []
'''
Paddle distributed training entry ``python -m paddle.distributed.launch``.
Help
# for arg usage and explanation, try the following command
# python -m paddle.distributed.launch -h
Collective Mode
Case 1: 1 node
use all visible devices
# python -m paddle.distributed.launch train.py
use specified devices
# python -m paddle.distributed.launch --devices=0,1,2,3 train.py
Case 2: multi-node, auto detect ip/port
# python -m paddle.distributed.launch --nnodes 2 train.py
# auto print following command
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --nnodes 2 demo.py
# then copy and paste above command to other nodes
Case 3: multi-node, specified master/rendezvous server
# python -m paddle.distributed.launch --nnodes 2 --master 10.0.0.1:2379 train.py
# the master ip must be one of the node and the port must available
Parameter Server Mode
Case 1.1: 1 node, 1 ps, 1 trainer
# python -m paddle.distributed.launch --mode ps train.py
# python -m paddle.distributed.launch --server_num=1 --trainer_num=1 train.py
Case 1.2: 1 node, 2 ps, 2 trainer
# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 train.py
Case 2: 2 node, 2 ps, 2 trainer per node
# python -m paddle.distributed.launch --server_num=2 --trainer_num=2 --nnodes 2 train.py
# auto print following command
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py
# then copy and paste above command to other nodes
Case 3: multi-node, specified master/rendezvous server
# python -m paddle.distributed.launch --master 10.0.0.1:13538 --server_num=2 --trainer_num=2 --nnodes 2 train.py
# the master ip must be one of the node and the port must available
Case 4: specified servers and trainers in each node
python -m paddle.distributed.launch --servers 127.0.0.1:8900,127.0.0.1:8901 --trainers 127.0.0.1:8902,127.0.0.1:8903 train.py
Elastic Mode
# run following command in 3 node to run immediately, or in 2 node to run after elastic_timeout
# python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2:3 train.py
# once the peer number changes between 2:3, the strategy holds
'''
......@@ -12,31 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .context import Context
from . import controllers
from .main import launch
def launch():
# initialize the context to run
ctx = Context()
if ctx.is_legacy_mode():
# legacy mode
from paddle.distributed.fleet import launch
launch.launch()
else:
# initialize the selected controller
c = controllers.init(ctx)
# run the pods
c.run()
# manager or just wait pod
c.finalize()
if __name__ == "__main__":
launch()
launch()
......@@ -82,6 +82,12 @@ class Context(object):
logger.addHandler(ch)
return logger
def continous_log(self) -> bool:
if self.args.log_level.upper() in ['DEBUG', 'ERROR']:
return True
else:
return False
def set_env_in_args(self):
for k, v in env_args_mapping.items():
if k in self.envs:
......
......@@ -20,7 +20,7 @@ env_args_mapping = {
'PADDLE_MASTER': 'master',
'PADDLE_DEVICES': 'devices',
'PADDLE_NNODES': 'nnodes',
'PADDLE_MODE': 'mode',
'PADDLE_RUN_MODE': 'run_mode',
'PADDLE_LOG_LEVEL': 'log_level',
'PADDLE_NPROC_PER_NODE': 'nproc_per_node',
'PADDLE_JOB_ID': 'job_id',
......@@ -60,7 +60,7 @@ def parse_args():
"--legacy", type=bool, default=False, help="use legacy launch")
base_group.add_argument(
"--rank", type=int, default=-1, help="the peer rank")
"--rank", type=int, default=-1, help="the node rank")
base_group.add_argument(
"--log_level", type=str, default="INFO", help="log level. Default INFO")
......@@ -69,7 +69,7 @@ def parse_args():
"--nnodes",
type=str,
default="1",
help="the number of peers, i.e. pod/node number")
help="the number of nodes, i.e. pod/node number")
base_group.add_argument(
"--nproc_per_node",
......@@ -83,7 +83,7 @@ def parse_args():
default="log",
help="the path for each process's log. Default ./log")
base_group.add_argument(
"--mode",
"--run_mode",
type=str,
default="collective",
help="run mode of the job, collective/ps/ps-heter")
......@@ -146,6 +146,6 @@ def parse_args():
"--elastic_timeout",
type=int,
default=30,
help="seconds to wait before elastic perform training")
help="seconds to wait before elastic job begin to train")
return parser.parse_known_args()
......@@ -115,46 +115,6 @@ class CollectiveElasticController(CollectiveController):
self.master.register_heartbeat(self.job.id, self.pod.name)
def watch(self) -> bool:
'''
watch self and peer status, return true to exit
'''
self.ctx.logger.info("Watching {}".format(self.pod))
while not self.ctx.status.is_done():
# self status
status = self.pod.watch(timeout=2)
self.ctx.logger.debug("Pod status {}, Ctx status {}".format(
status, self.ctx.status.current()))
# completed
if status == self.ctx.status.COMPLETED:
self.master.set_status(status)
self.ctx.status.complete()
self.ctx.logger.info("Pod complete {}".format(status))
return True
# self failure
elif status == self.ctx.status.FAILED:
self.master.set_status(status)
self.master.restart_peer()
self.ctx.logger.info("Pod failed {}".format(status))
self.pod.stop()
if self.ctx.args.elastic_level <= 0:
return True
else:
return False
# peer failure
if self.ctx.status.is_restarting() and self.master.get_status(
) != self.ctx.status.COMPLETED:
self.pod.stop()
return False
#peers = self.master.fetch_peer_alive()
#print("peers {}".format(peers))
def run(self):
timeout = self.ctx.args.elastic_timeout if self.job.elastic else self.ctx.args.elastic_timeout * 10
......@@ -164,6 +124,8 @@ class CollectiveElasticController(CollectiveController):
self.build_job()
self.ctx.logger.info("Waiting peer ready...")
ok, replicas = self.master.wait_peer_ready(
self.job.replicas_min, self.job.replicas_max, timeout)
if ok:
......
......@@ -40,7 +40,7 @@ class ControllerBase(object):
self.master = Master.factory(self.ctx)
self.job = Job(nnodes=self.ctx.args.nnodes,
mode=self.ctx.args.mode,
mode=self.ctx.args.run_mode,
jid=self.ctx.args.job_id)
self.pod = Pod()
......@@ -65,18 +65,51 @@ class ControllerBase(object):
self.watch()
def watch(self) -> bool:
'''
watch self and peer status, return true to exit
'''
#TODO(kuizhiqing) unify ctx.status and master status
self.ctx.logger.info("Watching {}".format(self.pod))
status = self.pod.watch()
while not self.ctx.status.is_done():
status = self.pod.watch(timeout=2)
if self.ctx.continous_log():
self.pod.logs()
# completed
if status == self.ctx.status.COMPLETED:
self.ctx.status.complete()
self.master.set_status(status)
self.ctx.logger.info("Pod {}".format(status))
return True
# self failure
elif status == self.ctx.status.FAILED:
self.ctx.status.fail()
self.master.set_status(status)
self.master.restart_peer()
fc = self.pod.failed_container()
self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.error("Container failed !!!\n{}".format(fc[0]))
fc[0].tail()
self.pod.stop()
if self.ctx.args.elastic_level <= 0:
return True
else:
return False
if status == self.ctx.status.COMPLETED:
self.ctx.logger.info("Pod {}".format(status))
elif status == self.ctx.status.FAILED:
fc = self.pod.failed_container()
self.ctx.logger.info("Pod {}".format(status))
self.ctx.logger.error("Container failed !!!\n{}".format(fc[0]))
fc[0].tail()
self.pod.stop()
# peer failure
if self.ctx.status.is_restarting() and self.master.get_status(
) != self.ctx.status.COMPLETED:
self.pod.stop()
return False
def stop(self, sigint=None):
self.ctx.logger.debug("Controller stop")
......
......@@ -43,6 +43,15 @@ class Master(object):
def stop(self):
raise NotImplementedError
def set_status(self, status):
pass
def get_status(self):
return None
def restart_peer(self):
pass
def sync_peers(self, prefix, key, value, size, rank=-1) -> (list, int):
raise NotImplementedError
......@@ -122,7 +131,7 @@ class HTTPMaster(Master):
if size < 2:
return [value], 0
self.ctx.logger.info("Waiting peer ready...")
self.ctx.logger.info("Waiting peer start...")
self.lazy_init()
......@@ -184,7 +193,7 @@ class ETCDMaster(Master):
if size < 2:
return [value], 0
self.ctx.logger.info("Waiting peer ready...")
self.ctx.logger.info("Waiting peer start...")
path = "{}/{}/{}".format(prefix, key, rank)
......
......@@ -21,11 +21,11 @@ import os, shutil
class PSController(Controller):
@classmethod
def enable(cls, ctx):
if ctx.args.mode == ControleMode.PS or ctx.args.server_num or len(
if ctx.args.run_mode == ControleMode.PS or ctx.args.server_num or len(
ctx.args.servers) > 0 or ctx.args.trainer_num or len(
ctx.args.trainers) > 0:
ctx.logger.debug("{} enabled".format(cls.__name__))
ctx.args.mode = ControleMode.PS
ctx.args.run_mode = ControleMode.PS
return True
else:
return False
......
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .context import Context
def launch():
"""
Paddle distribution training entry ``python -m paddle.distributed.launch``.
Usage:
.. code-block:: bash
:name: code-block-bash1
python -m paddle.distributed.launch [-h] [--master MASTER] [--rank RANK]
[--log_level LOG_LEVEL] [--nnodes NNODES]
[--nproc_per_node NPROC_PER_NODE] [--log_dir LOG_DIR]
[--run_mode RUN_MODE] [--job_id JOB_ID] [--devices DEVICES]
[--host HOST] [--servers SERVERS] [--trainers TRAINERS]
[--trainer_num TRAINER_NUM] [--server_num SERVER_NUM]
[--gloo_port GLOO_PORT] [--with_gloo WITH_GLOO]
[--max_restart MAX_RESTART] [--elastic_level ELASTIC_LEVEL]
[--elastic_timeout ELASTIC_TIMEOUT]
training_script ...
Base Parameters:
- ``--master``: The master/rendezvous server, support http:// and etcd://, default with http://. e.g., ``--master=127.0.0.1:8080``. Default ``--log_dir=None``.
- ``--rank``: The rank of the node, can be auto assigned by master. Default ``--rank=-1``.
- ``--log_level``: The log levl to set for logging.setLevel. Default ``--log_level=INFO``.
- ``--nnodes``: The number of nodes for a distributed job, it can be a range in elastic mode, e.g., ``--nnnodes=2:3``. Default ``--nnodes=1``.
- ``--nproc_per_node``: The number of processes to launch on a node. In gpu training, it should be less or equal to the gpus number of you system. e.g., ``--nproc_per_node=8``
- ``--log_dir``: The path for each process's log. e.g., ``--log_dir=output_dir``. Default ``--log_dir=log``.
- ``--run_mode``: The run mode of job, can be:collective/ps/ps-heter. e.g., ``--run_mode=ps``. Default ``--run_mode=collective``.
- ``--job_id``: The job unique id, it affects the log files' name. e.g., ``--job_id=job1``. Default ``--job_id=default``.
- ``--devices``: The selected accelerate devices on nodes, can be gpu/xpu/npu/mlu etc.. e.g., ``--devices=0,1,2,3`` will launch four training processes each bound to one device.
- ``training_script``: The full path to the single GPU training program/script to be launched in parallel, followed by all the arguments for the training script. e.g., ``traing.py``
- ``training_script_args``: The args of training_script. e.g., ``--lr=0.1``
Collective Parameters:
- ``--ips``: [DEPRECATED] Paddle cluster nodes ips, e.g., ``--ips=192.168.0.16,192.168.0.17``. Default ``--ips=127.0.0.1``.
Parameter-Server Parameters:
- ``--servers``: User defined servers ip:port, e.g., ``--servers="192.168.0.16:6170,192.168.0.17:6170"``
- ``--trainers``: User defined trainers ip:port, e.g., ``--trainers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172"``
- ``--workers``: [DEPRECATED] The same as trainers.
- ``--trainer_num``: Number of trainers on each node, can be 0.
- ``--worker_num``: [DEPRECATED] The same as trainer_num.
- ``--server_num``: Number of servers on each node, can be 0.
- ``--heter_workers``: User defined heter workers ip1:port1;ip2:port2, e.g., ``--heter_workers="192.168.0.16:6172;192.168.0.17:6172"``
- ``--heter_worker_num``: Number of heter_workers in each stage (It recommend to set when in the emulated distributed environment using single node)
- ``--heter_devices``: Type of heter_device in each stage
- ``--gloo_port``: Gloo http Port. Default ``--gloo_port=6767``.
- ``--with_gloo``: Using gloo or not. Default ``--with_gloo=0``.
Elastic Parameters:
- ``--max_restart``: The maximum restart times for an elastic job. Default ``--max_restart=3``.
- ``--elastic_level``: The elastic level: -1: disable, 0: failed exit, peers hold, 1: internal restart. Default ``--elastic_level=-1``.
- ``--elastic_timeout``: Seconds to wait before elastic job begin to train. Default ``--elastic_timeout=30``.
Returns:
``None``
Examples 0 (master, ip/port auto detection):
# For training on multi node, run the following command in one of the nodes
python -m paddle.distributed.launch --nnodes 2 train.py
# Then the following info will be print
# Copy the following command to other nodes to run.
# --------------------------------------------------------------------------------
# python -m paddle.distributed.launch --master 10.0.0.1:38714 --nnodes 2 train.py
# --------------------------------------------------------------------------------
# Follow the instruction above and paste the command in other nodes can launch a multi nodes training job.
# There are two ways to launch a job with the same command for multi nodes training
# 1) using the following command in every nodes, make sure the ip is one of the training node and the port is available on that node
# python -m paddle.distributed.launch --master 10.0.0.1:38714 --nnodes 2 train.py
# 2) using the following command in every nodes with a independent etcd service
# python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2 train.py
# This functionality works will for both collective and ps mode and even with other arguments.
Examples 1 (collective, single node):
.. code-block:: bash
:name: code-block-example-bash1
# For training on single node using 4 gpus.
python -m paddle.distributed.launch --devices=0,1,2,3 train.py --lr=0.01
Examples 2 (collective, multi node):
.. code-block:: bash
:name: code-block-example-bash2
# For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17
# On 192.168.0.16:
python -m paddle.distributed.launch --devices=0,1,2,3 --master=192.168.0.16:8090 train.py --lr=0.01
# On 192.168.0.17:
python -m paddle.distributed.launch --devices=0,1,2,3 --master=192.168.0.16:8090 train.py --lr=0.01
Examples 3 (ps, cpu, single node):
.. code-block:: bash
:name: code-block-example-bash3
# To simulate distributed environment using single node, e.g., 2 servers and 4 workers.
python -m paddle.distributed.launch --server_num=2 --worker_num=4 train.py --lr=0.01
Examples 4 (ps, cpu, multi node):
.. code-block:: bash
:name: code-block-example-bash4
# For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 1 server and 2 workers.
# On 192.168.0.16:
python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01
# On 192.168.0.17:
python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01
# Or with master, the following command run 2 server and 2 trainer on each node.
python -m paddle.distributed.launch --master 192.168.0.16:9090 --server_num=2 --trainer_num=2 --nnodes 2 train.py
Examples 5 (ps, gpu, single node):
.. code-block:: bash
:name: code-block-example-bash5
# To simulate distributed environment using single node, e.g., 2 servers and 4 workers, each worker use single gpu.
export CUDA_VISIBLE_DEVICES=0,1,2,3
python -m paddle.distributed.launch --server_num=2 --worker_num=4 train.py --lr=0.01
Examples 6 (ps, gpu, multi node):
.. code-block:: bash
:name: code-block-example-bash6
# For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 1 server and 2 workers.
# On 192.168.0.16:
export CUDA_VISIBLE_DEVICES=0,1
python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01
# On 192.168.0.17:
export CUDA_VISIBLE_DEVICES=0,1
python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01
Examples 7 (ps-heter, cpu + gpu, single node):
.. code-block:: bash
:name: code-block-example-bash7
# To simulate distributed environment using single node, e.g., 2 servers and 4 workers, two workers use gpu, two workers use cpu.
export CUDA_VISIBLE_DEVICES=0,1
python -m paddle.distributed.launch --server_num=2 --worker_num=2 --heter_worker_num=2 train.py --lr=0.01
Examples 8 (ps-heter, cpu + gpu, multi node):
.. code-block:: bash
:name: code-block-example-bash8
# For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 1 server, 1 gpu worker, 1 cpu worker.
# On 192.168.0.16:
export CUDA_VISIBLE_DEVICES=0
python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.17:6171" --heter_workers="192.168.0.16:6172,192.168.0.17:6172" train.py --lr=0.01
# On 192.168.0.17:
export CUDA_VISIBLE_DEVICES=0
python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.17:6171" --heter_workers="192.168.0.16:6172,192.168.0.17:6172" train.py --lr=0.01
Examples 9 (elastic):
.. code-block:: bash
:name: code-block-example-bash9
# With the following command, the job will begin to run immediately if 4 nodes are ready,
# or it will run after elastic_timeout if only 2 or 3 nodes ready
python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2:4 train.py
# once the number of nodes changes between 2:4 during training, the strategy holds
"""
# initialize the context to run
ctx = Context()
if ctx.is_legacy_mode():
# legacy mode
from paddle.distributed.fleet import launch
launch.launch()
else:
from . import controllers
# initialize the selected controller
c = controllers.init(ctx)
# run the pods
c.run()
# manager or just wait pod
c.finalize()
if __name__ == "__main__":
launch()
......@@ -116,7 +116,7 @@ class PS_Test(unittest.TestCase):
return proc
def test_ps_1(self):
args = "--mode ps"
args = "--run_mode ps"
p = self.pdrun(args)
p.wait()
self.assertTrue(p.poll() == 0)
......
......@@ -733,7 +733,7 @@ with redirect_stdout():
},
entry_points={
'console_scripts': [
'fleetrun = paddle.distributed.launch.__main__:launch'
'fleetrun = paddle.distributed.launch.main:launch'
]
},
classifiers=[
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册