launch.py 7.2 KB
Newer Older
G
gongweibao 已提交
1
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
W
Wu Yi 已提交
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
G
gongweibao 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
"""
paddle.distributed.launch is a module that spawns multiple distributed 
process on each trainning node for gpu trainning.

Usage:
    In both of single node training or multiple node training, this module 
launch a process on each of the given gpu card.

    1. for single node trainning with all visible gpu cards:
       python -m paddle.distributed.launch \
         your_training_py (arg1 arg2 and all others)
    
    2. for single node trainning with [0,4) cards
       python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \
         your_training_py (arg1 arg2 and all others)

    3. for mulitple node training such as two node:192.168.0.16, 192.168.0.17
        on 192.168.0.16:
            python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
                --node_ip=192.168.0.16 \
                your_training_py (arg1 arg2 and all others)

        on 192.168.0.17:
            python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
                --node_ip=192.168.0.17 \
                your_training_py (arg1 arg2 and all others)
"""
W
Wu Yi 已提交
41 42

from __future__ import print_function
G
gongweibao 已提交
43 44
import sys
from sys import version
W
Wu Yi 已提交
45 46
import subprocess
import os
G
gongweibao 已提交
47 48 49 50
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle.fluid as fluid
51 52


G
gongweibao 已提交
53 54 55 56 57
def _print_arguments(args):
    print("-----------  Configuration Arguments -----------")
    for arg, value in sorted(six.iteritems(vars(args))):
        print("%s: %s" % (arg, value))
    print("------------------------------------------------")
W
Wu Yi 已提交
58

59

G
gongweibao 已提交
60 61 62 63 64 65
def _parse_args():
    """
    Helper function parsing the command line options
    @retval ArgumentParser
    """
    parser = ArgumentParser(
W
Wu Yi 已提交
66 67 68 69 70 71 72 73 74 75 76 77
        description='''start paddle training using multi-process mode.
NOTE: your train program ***must*** run as distributed nccl2 mode,
see: http://www.paddlepaddle.org/documentation/docs/zh/1.2/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
And your train program must read environment variables below in order to let different
process init properly:
FLAGS_selected_gpus
PADDLE_TRAINER_ID
PADDLE_CURRENT_ENDPOINT
PADDLE_TRAINERS_NUM
PADDLE_TRAINER_ENDPOINTS
POD_IP (current node ip address, not needed for local training)
''')
G
gongweibao 已提交
78 79

    # Optional arguments for the launch helper
W
Wu Yi 已提交
80
    parser.add_argument(
G
gongweibao 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93
        "--cluster_node_ips",
        type=str,
        default="127.0.0.1",
        help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")

    parser.add_argument(
        "--node_ip",
        type=str,
        default="127.0.0.1",
        help="The current node ip. ")

    parser.add_argument(
        "--started_port",
W
Wu Yi 已提交
94
        type=int,
G
gongweibao 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
        default=6170,
        help="The trainer's started port on a single node")

    parser.add_argument(
        "--print_config",
        type=bool,
        default=True,
        help="Print the config or not")

    parser.add_argument(
        "--selected_gpus",
        type=str,
        default=None,
        help="It's for gpu trainning and the trainning process will run on the selected_gpus,"
        "each process is bound to a single GPU. And if it's not setted, this module will use all the gpu cards for training."
    )

W
Wu Yi 已提交
112
    parser.add_argument(
G
gongweibao 已提交
113
        "--log_dir",
W
Wu Yi 已提交
114
        type=str,
G
gongweibao 已提交
115 116 117 118
        help="The path for each process's log.If it's not setted, the log will printed to default pipe."
    )

    # positional
119
    parser.add_argument(
G
gongweibao 已提交
120
        "training_script",
121
        type=str,
G
gongweibao 已提交
122 123 124 125 126 127 128
        help="The full path to the single GPU training "
        "program/script to be launched in parallel, "
        "followed by all the arguments for the "
        "training script")

    # rest from the training program
    parser.add_argument('training_script_args', nargs=REMAINDER)
129 130 131
    return parser.parse_args()


G
gongweibao 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
def start_procs(args):
    """
    """
    procs = []
    log_fns = []

    default_env = os.environ.copy()

    current_node_ip = args.node_ip
    node_ips = [x.strip() for x in args.cluster_node_ips.split(',')]
    node_id = node_ips.index(current_node_ip)
    num_nodes = len(node_ips)

    if args.selected_gpus is None:
        gpus_num = fluid.core.get_cuda_device_count()
        selected_gpus = [str(x) for x in range(0, gpus_num)]
    else:
        selected_gpus = [x.strip() for x in args.selected_gpus.split(',')]
    selected_gpus_num = len(selected_gpus)

    trainers_endpoints = ""
    for ip in node_ips:
        for i in range(selected_gpus_num):
            if trainers_endpoints != "":
                trainers_endpoints += ","
157
            trainers_endpoints += "%s:%d" % (ip, args.started_port + i)
G
gongweibao 已提交
158 159 160 161 162 163 164 165 166

    nranks = num_nodes * selected_gpus_num

    if args.print_config:
        print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id,
              ", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes,
              ", node_ips:", node_ips, ", nranks:", nranks)

    current_env = copy.copy(default_env)
G
gongweibao 已提交
167 168 169 170 171 172 173
    # paddle broadcast ncclUniqueId use socket, and
    # proxy maybe make trainers unreachable, so delete them.
    # if we set them to "", grpc will log error message "bad uri"
    # so just delete them.
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

G
gongweibao 已提交
174 175 176 177 178 179 180 181 182
    procs = []
    cmds = []
    for i in range(0, selected_gpus_num):
        current_env.update({
            "FLAGS_selected_gpus": "%s" % selected_gpus[i],
            "PADDLE_TRAINER_ID": "%d" % (node_id * selected_gpus_num + i),
            "PADDLE_CURRENT_ENDPOINT":
            "%s:%d" % (current_node_ip, args.started_port + i),
            "PADDLE_TRAINERS_NUM": "%d" % nranks,
G
gongweibao 已提交
183
            "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints
G
gongweibao 已提交
184 185
        })

G
gongweibao 已提交
186 187 188
        if num_nodes > 1:
            current_env.update({"FLAGS_sync_nccl_allreduce": "0"})

G
gongweibao 已提交
189 190 191 192 193 194
        cmd = [sys.executable, "-u", args.training_script
               ] + args.training_script_args

        cmds.append(cmd)

        if args.log_dir is not None:
195
            os.system("mkdir -p {}".format(args.log_dir))
G
gongweibao 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
            fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
            log_fns.append(fn)

            proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
        else:
            proc = subprocess.Popen(cmd, env=current_env)

        procs.append(proc)

    for i in range(0, len(procs)):
        proc = procs[i]

        proc.wait()
        if len(log_fns) > 0:
            log_fns[i].close()

        if proc.returncode != 0:
            raise subprocess.CalledProcessError(
                returncode=procs[i].returncode, cmd=cmds[i])

216

G
gongweibao 已提交
217 218 219 220 221
def launch():
    args = _parse_args()
    if args.print_config:
        _print_arguments(args)
    start_procs(args)
W
Wu Yi 已提交
222 223 224


if __name__ == "__main__":
G
gongweibao 已提交
225
    launch()