main.py 18.9 KB
Newer Older
1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
2
#
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9 10 11 12 13 14 15 16 17 18 19 20
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .context import Context


def launch():
    """
    Paddle distribution training entry ``python -m paddle.distributed.launch``.
21

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
    Usage:
        .. code-block:: bash
            :name: code-block-bash1

            python -m paddle.distributed.launch [-h] [--master MASTER] [--rank RANK]
                   [--log_level LOG_LEVEL] [--nnodes NNODES]
                   [--nproc_per_node NPROC_PER_NODE] [--log_dir LOG_DIR]
                   [--run_mode RUN_MODE] [--job_id JOB_ID] [--devices DEVICES]
                   [--host HOST] [--servers SERVERS] [--trainers TRAINERS]
                   [--trainer_num TRAINER_NUM] [--server_num SERVER_NUM]
                   [--gloo_port GLOO_PORT] [--with_gloo WITH_GLOO]
                   [--max_restart MAX_RESTART] [--elastic_level ELASTIC_LEVEL]
                   [--elastic_timeout ELASTIC_TIMEOUT]
                   training_script ...


    Base Parameters:
39
        - ``--master``: The master/rendezvous server, support ``http://`` and ``etcd://``, default with ``http://``. e.g., ``--master=127.0.0.1:8080``. Default ``--master=None``.
40 41 42

        - ``--rank``: The rank of the node, can be auto assigned by master. Default ``--rank=-1``.

43
        - ``--log_level``: The log level to set for logging.setLevel which can be CRITICAL/ERROR/WARNING/INFO/DEBUG/NOTSET, case insensitive. Default ``--log_level=INFO``.
44

45
        - ``--nnodes``: The number of nodes for a distributed job, it can be a range in elastic mode, e.g., ``--nnodes=2:3``. Default ``--nnodes=1``.
46 47 48 49 50

        - ``--nproc_per_node``: The number of processes to launch on a node. In gpu training, it should be less or equal to the gpus number of you system.  e.g., ``--nproc_per_node=8``

        - ``--log_dir``: The path for each process's log. e.g., ``--log_dir=output_dir``. Default ``--log_dir=log``.

51
        - ``--run_mode``: The run mode of job, can be:collective/ps/ps-heter/rpc. e.g., ``--run_mode=ps``. Default ``--run_mode=collective``.
52 53 54

        - ``--job_id``: The job unique id, it affects the log files' name. e.g., ``--job_id=job1``. Default ``--job_id=default``.

J
jjyaoao 已提交
55
        - ``--devices``: The selected accelerate devices on nodes, can be gpu/xpu etc.. e.g., ``--devices=0,1,2,3`` will launch four training processes each bound to one device.
56

57
        - ``training_script``: The full path to the single GPU training program/script to be launched in parallel, followed by all the arguments for the training script. e.g., ``training.py``
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

        - ``training_script_args``: The args of training_script. e.g., ``--lr=0.1``

    Collective Parameters:
        - ``--ips``: [DEPRECATED] Paddle cluster nodes ips, e.g., ``--ips=192.168.0.16,192.168.0.17``. Default ``--ips=127.0.0.1``.

    Parameter-Server Parameters:
        - ``--servers``: User defined servers ip:port, e.g., ``--servers="192.168.0.16:6170,192.168.0.17:6170"``

        - ``--trainers``: User defined trainers ip:port, e.g., ``--trainers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172"``

        - ``--workers``: [DEPRECATED] The same as trainers.

        - ``--trainer_num``: Number of trainers on each node, can be 0.

        - ``--worker_num``: [DEPRECATED] The same as trainer_num.

        - ``--server_num``: Number of servers on each node, can be 0.

        - ``--heter_workers``: User defined heter workers ip1:port1;ip2:port2, e.g., ``--heter_workers="192.168.0.16:6172;192.168.0.17:6172"``

        - ``--heter_worker_num``: Number of heter_workers in each stage (It recommend to set when in the emulated distributed environment using single node)
80

81 82 83 84 85 86 87 88 89 90 91 92 93
        - ``--heter_devices``: Type of heter_device in each stage

        - ``--gloo_port``: Gloo http Port. Default ``--gloo_port=6767``.

        - ``--with_gloo``: Using gloo or not. Default ``--with_gloo=0``.

    Elastic Parameters:
        - ``--max_restart``: The maximum restart times for an elastic job. Default ``--max_restart=3``.

        - ``--elastic_level``: The elastic level: -1: disable, 0: failed exit, peers hold, 1: internal restart. Default ``--elastic_level=-1``.

        - ``--elastic_timeout``: Seconds to wait before elastic job begin to train. Default ``--elastic_timeout=30``.

94 95 96
    IPU Parameters:
        IPU distributed launch only requires and allowes three arguments ``--devices``, ``training_script`` and ``training_script_args``.
        The ``--devices`` is the number of IPU devices. e.g., ``--devices=4`` will launch the training program with four IPU devices.
97
        The ``training_script`` is only allowed to set as ``ipu``.
98 99 100
        The ``training_script_args`` includes arguments required by IPU distributed launch and illustrated as below.
        ``Examples 10`` has provided a example of paddle.distributed.launch with IPUs.

101
        - ``--hosts``: The hosts for IPU distributd training. Each host is able to include multiple processes.
102

103
        - ``--nproc_per_host``: The number of processes launched per host. Each process is able to include multiple replicas.
104

105
        - ``--ipus_per_replica``: The number of IPUs requested per replica. Each replica is able to include multiple IPUs.
106 107 108 109 110 111 112

        - ``--ipu_partition``: The partition name of IPU devices.

        - ``--vipu_server``: The ip of the IPU device manager.

        - ``training_script``: The full path to the IPU distributed training program/script to be launched in parallel. e.g., ``training.py``.

113
        - ``training_script_args``: The args of the IPU distributed training program/script. e.g., ``--lr=0.1``.
114 115

    Returns:
116
        - ``None``
117 118

    Examples 0 (master, ip/port auto detection):
119 120
        .. code-block:: bash
            :name: code-block-example-bash0
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146

            # For training on multi node, run the following command in one of the nodes

            python -m paddle.distributed.launch --nnodes 2 train.py

            # Then the following info will be print

            # Copy the following command to other nodes to run.
            # --------------------------------------------------------------------------------
            # python -m paddle.distributed.launch --master 10.0.0.1:38714 --nnodes 2 train.py
            # --------------------------------------------------------------------------------

            # Follow the instruction above and paste the command in other nodes can launch a multi nodes training job.

            # There are two ways to launch a job with the same command for multi nodes training
            # 1) using the following command in every nodes, make sure the ip is one of the training node and the port is available on that node
            # python -m paddle.distributed.launch --master 10.0.0.1:38714 --nnodes 2 train.py
            # 2) using the following command in every nodes with a independent etcd service
            # python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2 train.py

            # This functionality works will for both collective and ps mode and even with other arguments.


    Examples 1 (collective, single node):
        .. code-block:: bash
            :name: code-block-example-bash1
147

148 149 150
            # For training on single node using 4 gpus.

            python -m paddle.distributed.launch --devices=0,1,2,3 train.py --lr=0.01
151

152 153 154 155
    Examples 2 (collective, multi node):
        .. code-block:: bash
            :name: code-block-example-bash2

156
            # For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17
157 158 159 160 161 162 163

            # On 192.168.0.16:

            python -m paddle.distributed.launch --devices=0,1,2,3 --master=192.168.0.16:8090 train.py --lr=0.01

            # On 192.168.0.17:
            python -m paddle.distributed.launch --devices=0,1,2,3 --master=192.168.0.16:8090 train.py --lr=0.01
164

165 166 167 168 169
    Examples 3 (ps, cpu, single node):
        .. code-block:: bash
            :name: code-block-example-bash3

            # To simulate distributed environment using single node, e.g., 2 servers and 4 workers.
170

171
            python -m paddle.distributed.launch --server_num=2 --worker_num=4 train.py --lr=0.01
172

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
    Examples 4 (ps, cpu, multi node):
        .. code-block:: bash
            :name: code-block-example-bash4

            # For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 1 server and 2 workers.

            # On 192.168.0.16:

            python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01

            # On 192.168.0.17:

            python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01

            # Or with master, the following command run 2 server and 2 trainer on each node.

            python -m paddle.distributed.launch --master 192.168.0.16:9090 --server_num=2 --trainer_num=2 --nnodes 2 train.py


    Examples 5 (ps, gpu, single node):
        .. code-block:: bash
            :name: code-block-example-bash5

196
            # To simulate distributed environment using single node, e.g., 2 servers and 4 workers, each worker use single gpu.
197

198 199
            export CUDA_VISIBLE_DEVICES=0,1,2,3
            python -m paddle.distributed.launch --server_num=2 --worker_num=4 train.py --lr=0.01
200

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    Examples 6 (ps, gpu, multi node):
        .. code-block:: bash
            :name: code-block-example-bash6

            # For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 1 server and 2 workers.

            # On 192.168.0.16:

            export CUDA_VISIBLE_DEVICES=0,1
            python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01

            # On 192.168.0.17:

            export CUDA_VISIBLE_DEVICES=0,1
            python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.16:6172,192.168.0.17:6171,192.168.0.17:6172" train.py --lr=0.01

    Examples 7 (ps-heter, cpu + gpu, single node):
        .. code-block:: bash
            :name: code-block-example-bash7

            # To simulate distributed environment using single node, e.g., 2 servers and 4 workers, two workers use gpu, two workers use cpu.
222

223 224
            export CUDA_VISIBLE_DEVICES=0,1
            python -m paddle.distributed.launch --server_num=2 --worker_num=2 --heter_worker_num=2 train.py --lr=0.01
225

226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
    Examples 8 (ps-heter, cpu + gpu, multi node):
        .. code-block:: bash
            :name: code-block-example-bash8

            # For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 1 server, 1 gpu worker, 1 cpu worker.

            # On 192.168.0.16:

            export CUDA_VISIBLE_DEVICES=0
            python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.17:6171" --heter_workers="192.168.0.16:6172,192.168.0.17:6172" train.py --lr=0.01

            # On 192.168.0.17:

            export CUDA_VISIBLE_DEVICES=0
            python -m paddle.distributed.launch --servers="192.168.0.16:6170,192.168.0.17:6170" --workers="192.168.0.16:6171,192.168.0.17:6171" --heter_workers="192.168.0.16:6172,192.168.0.17:6172" train.py --lr=0.01

    Examples 9 (elastic):
        .. code-block:: bash
            :name: code-block-example-bash9

            # With the following command, the job will begin to run immediately if 4 nodes are ready,
            # or it will run after elastic_timeout if only 2 or 3 nodes ready
            python -m paddle.distributed.launch --master etcd://10.0.0.1:2379 --nnodes 2:4 train.py
249

250
            # once the number of nodes changes between 2:4 during training, the strategy holds
251

252 253 254 255
    Examples 10 (ipu):
        .. code-block:: bash
            :name: code-block-example-bash10

256 257 258 259 260
            # With the following command, the job will begin to run the distributhed program with IPUs
            # Require `devices` as the number of IPUs
            # Require `training_script` to be set as `ipu`
            # Require `training_script_args` as the arguments of IPU distributed training instead of the arguments of the training program/script
            # Please Check the `IPU Parameters` for details
261 262
            python -m paddle.distributed.launch --devices 4 ipu --hosts=localhost --nproc_per_host=2 --ipus_per_replica=1 --ipu_partition=pod16 --vipu_server=127.0.0.1 train.py

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
    Examples 11 (rpc, cpu, single node):
        .. code-block:: bash
            :name: code-block-example-bash11

            # Training on single node with two local servers
            python -m paddle.distributed.launch --master 127.0.0.1:8765 --nnodes 1 --nproc_per_node 2 --rank 0 --run_mode rpc train.py

    Examples 12 (rpc, cpu, multi node):
        .. code-block:: bash
            :name: code-block-example-bash12

            # For training on multiple nodes, e.g., 192.168.0.16, 192.168.0.17 where each node with 2 servers.

            # On 192.168.0.16

            python -m paddle.distributed.launch --master 192.168.0.16:8765 --nnodes 2 --nproc_per_node 2 --rank 0 --run_mode rpc train.py

            # On 192.168.0.17

            python -m paddle.distributed.launch --master 192.168.0.16:8765 --nnodes 2 --nproc_per_node 2 --rank 1 --run_mode rpc train.py

284 285 286 287 288 289 290 291
    """

    # initialize the context to run
    ctx = Context()

    if ctx.is_legacy_mode():
        # legacy mode
        from paddle.distributed.fleet import launch
292

293 294
        launch.launch()

295 296 297 298 299 300 301
    elif ctx.is_auto_tuner_mode():
        import copy
        import json
        import signal
        import sys
        import time

302
        from ..auto_tuner.recorder import History_recorder
303
        from ..auto_tuner.tuner import AutoTuner
304
        from ..auto_tuner.utils import gen_new_args, read_log
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
        from . import controllers

        # read user defined tuner config json
        try:
            with open(ctx.args.auto_tuner_json, "r") as f:
                tuner_cfg = json.load(f)
        except:
            raise ValueError("Please check your auto tuner json whether valid.")

        # copy training script args
        if ctx.args.training_script.endswith('.py'):
            entrypoint = [sys.executable, "-u", ctx.args.training_script]
        else:
            entrypoint = [ctx.args.training_script]
        entrypoint.extend(ctx.args.training_script_args)
        raw_args = copy.deepcopy(ctx.args.training_script_args)

        # get nodes and gpus from args
        if not ctx.args.devices:
            gpus_per_node = 8
        else:
            gpus_per_node = len(ctx.args.devices.split(","))
327 328 329 330 331
        nnodes = ctx.args.nnodes
        if isinstance(nnodes, str):
            tuner_cfg["nodes"] = int(nnodes.split(":")[0])
        else:
            tuner_cfg["nodes"] = int(nnodes)
332 333 334 335 336 337 338 339 340
        tuner_cfg["num_gpus"] = gpus_per_node * tuner_cfg["nodes"]

        # build AutoTuner to get new config
        auto_tuner = AutoTuner(tuner_cfg)
        cur_cfg = auto_tuner.search_once()

        # get max time per task run
        max_time_per_task = tuner_cfg.get("max_time_per_task", 1800)

341 342 343
        # build history recorder
        recorder = History_recorder()

344 345
        job_id = 0
        while cur_cfg:
346
            ctx.status._current_status = None
347 348 349
            # auto tuner supports dp, mp, pp, micro batch size, sharding, recompute by default and every task has own log dir
            log_dir = "DP{}_MP{}_PP{}_Sharding_degree_{}_stage_{}_MBS_{}_Recompute_{}_granularity_{}".format(
                cur_cfg["dp_degree"],
350
                cur_cfg["mp_degree"],
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
                cur_cfg["pp_degree"],
                cur_cfg["sharding_degree"],
                cur_cfg["sharding_stage"],
                cur_cfg["micro_batch_size"],
                cur_cfg["use_recompute"],
                cur_cfg["recompute_granularity"],
            )

            ctx.args.log_dir = log_dir

            # every task has own job id
            job_id += 1
            task_job_id = "auto_tuner_" + str(job_id)
            ctx.args.job_id = task_job_id

            # generate script args of task
            new_args = gen_new_args(raw_args, cur_cfg, tuner_cfg)
            ctx.args.training_script_args = new_args

            # launch task
            ctx.logger.info(
                "Launch task from auto tuner: job_id {}, log_dir {}, config {}".format(
                    task_job_id, log_dir, cur_cfg
                )
            )

            c = controllers.init(ctx)
            # set per task timeout
            signal.signal(signal.SIGALRM, c.not_exit_signal_handler)
            signal.alarm(max_time_per_task)
            c.run()

383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
            # Process generated result
            metric, err = read_log(
                path=ctx.args.log_dir,
                file="workerlog.0",
                target_metric=tuner_cfg["metric_cfg"]["name"],
            )
            if err:
                ctx.logger.warning(f"Read log failed for parameters: {log_dir}")
                cur_cfg['time'] = None  # for pruner use.
                cur_cfg[tuner_cfg['metric_cfg']['name']] = None
            else:
                cur_cfg['time'] = metric  # for pruner use.
                cur_cfg[tuner_cfg['metric_cfg']['name']] = metric
                # record history
            cur_cfg['job_id'] = job_id
            recorder.add_cfg(**cur_cfg)
            cur_best_cfgs, err = recorder.get_best(
                metric=tuner_cfg['metric_cfg']['name'],
                direction=tuner_cfg['metric_cfg']['OptimizationDirection'],
            )
            if not err:
                ctx.logger.info(f"Current best config: {cur_best_cfgs}")
                recorder.store_history(
                    ctx.args.auto_tuner_json.split(".")[0] + "_history.csv"
                )
            else:
                ctx.logger.info(
                    "Get best config failed. Currently there are no appropriate configs."
                )

413 414 415 416 417 418 419 420 421 422
            new_cfg = auto_tuner.search_once()
            if new_cfg:
                c.finalize(exit=False)
            else:
                c.finalize(exit=True)

            # per task launch interval
            time.sleep(5)

            cur_cfg = copy.deepcopy(new_cfg)
423
        recorder.store_history()
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
    else:
        from . import controllers

        # initialize the selected controller
        c = controllers.init(ctx)

        # run the pods
        c.run()

        # manager or just wait pod
        c.finalize()


if __name__ == "__main__":
    launch()