launch.py 11.4 KB
Newer Older
G
gongweibao 已提交
1
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
W
Wu Yi 已提交
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
G
gongweibao 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
"""
paddle.distributed.launch is a module that spawns multiple distributed 
process on each trainning node for gpu trainning.
Usage:
    In both of single node training or multiple node training, this module 
launch a process on each of the given gpu card.
    1. for single node trainning with all visible gpu cards:
       python -m paddle.distributed.launch \
         your_training_py (arg1 arg2 and all others)
    
    2. for single node trainning with [0,4) cards
       python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \
         your_training_py (arg1 arg2 and all others)
    3. for mulitple node training such as two node:192.168.0.16, 192.168.0.17
        on 192.168.0.16:
            python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
                --node_ip=192.168.0.16 \
                your_training_py (arg1 arg2 and all others)
        on 192.168.0.17:
            python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
                --node_ip=192.168.0.17 \
                your_training_py (arg1 arg2 and all others)
"""
W
Wu Yi 已提交
37 38

from __future__ import print_function
39
import logging
G
gongweibao 已提交
40 41
import sys
from sys import version
W
Wu Yi 已提交
42 43
import subprocess
import os
44
import time
G
gongweibao 已提交
45 46 47 48
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle.fluid as fluid
49

50 51 52 53
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_handler = logging.StreamHandler()
log_format = logging.Formatter(
54
    '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s')
55 56 57
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)

58

G
gongweibao 已提交
59 60 61 62 63
def _print_arguments(args):
    print("-----------  Configuration Arguments -----------")
    for arg, value in sorted(six.iteritems(vars(args))):
        print("%s: %s" % (arg, value))
    print("------------------------------------------------")
W
Wu Yi 已提交
64

65

G
gongweibao 已提交
66 67 68 69 70 71
def _parse_args():
    """
    Helper function parsing the command line options
    @retval ArgumentParser
    """
    parser = ArgumentParser(
W
Wu Yi 已提交
72 73
        description='''start paddle training using multi-process mode.
NOTE: your train program ***must*** run as distributed nccl2 mode,
74
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
W
Wu Yi 已提交
75 76 77 78 79 80 81 82 83
And your train program must read environment variables below in order to let different
process init properly:
FLAGS_selected_gpus
PADDLE_TRAINER_ID
PADDLE_CURRENT_ENDPOINT
PADDLE_TRAINERS_NUM
PADDLE_TRAINER_ENDPOINTS
POD_IP (current node ip address, not needed for local training)
''')
G
gongweibao 已提交
84

85
    #Optional arguments for the launch helper
W
Wu Yi 已提交
86
    parser.add_argument(
G
gongweibao 已提交
87 88 89 90 91 92 93 94 95
        "--cluster_node_ips",
        type=str,
        default="127.0.0.1",
        help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
    parser.add_argument(
        "--node_ip",
        type=str,
        default="127.0.0.1",
        help="The current node ip. ")
96 97
    parser.add_argument(
        "--use_paddlecloud",
98 99 100
        action='store_true',
        help="wheter to use paddlecloud platform to run your multi-process job. If false, no need to set this argument."
    )
G
gongweibao 已提交
101 102
    parser.add_argument(
        "--started_port",
W
Wu Yi 已提交
103
        type=int,
G
gongweibao 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
        default=6170,
        help="The trainer's started port on a single node")

    parser.add_argument(
        "--print_config",
        type=bool,
        default=True,
        help="Print the config or not")

    parser.add_argument(
        "--selected_gpus",
        type=str,
        default=None,
        help="It's for gpu trainning and the trainning process will run on the selected_gpus,"
        "each process is bound to a single GPU. And if it's not setted, this module will use all the gpu cards for training."
    )

W
Wu Yi 已提交
121
    parser.add_argument(
G
gongweibao 已提交
122
        "--log_dir",
W
Wu Yi 已提交
123
        type=str,
G
gongweibao 已提交
124 125 126
        help="The path for each process's log.If it's not setted, the log will printed to default pipe."
    )

127
    #positional
128
    parser.add_argument(
G
gongweibao 已提交
129
        "training_script",
130
        type=str,
G
gongweibao 已提交
131 132 133 134 135
        help="The full path to the single GPU training "
        "program/script to be launched in parallel, "
        "followed by all the arguments for the "
        "training script")

136
    #rest from the training program
G
gongweibao 已提交
137
    parser.add_argument('training_script_args', nargs=REMAINDER)
138 139 140
    return parser.parse_args()


141 142 143 144 145 146
def terminate_procs(procs):
    for p in procs:
        if p.poll() is None:
            p.terminate()


G
gongweibao 已提交
147 148 149 150 151 152 153 154
def start_procs(args):
    """
    """
    default_env = os.environ.copy()

    current_node_ip = args.node_ip
    node_ips = [x.strip() for x in args.cluster_node_ips.split(',')]
    node_id = node_ips.index(current_node_ip)
155 156 157 158 159 160 161 162 163 164 165 166 167 168
    if args.use_paddlecloud:
        trainer_nums = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
        if trainer_nums != 1:
            #you can automatically get ip info while using paddlecloud multi nodes mode.
            current_node_ip = os.getenv("POD_IP")
            assert current_node_ip is not None, "POD_IP should not be None"
            node_ips = os.getenv("PADDLE_TRAINERS")
            assert node_ips is not None, "PADDLE_TRAINERS should not be None"
            node_ips = node_ips.split(",")
            node_id = os.getenv("PADDLE_TRAINER_ID")
            assert node_id is not None, "PADDLE_TRAINER_ID should not be None"
            node_id = int(node_id)

            if args.node_ip != "127.0.0.1" and current_node_ip != args.node_ip:
169
                logger.warning(
170 171 172 173 174 175
                    "Please NOTE: When using paddlecloud, current_node_ip is \
automatically got from POD_IP. Your input node_ip: {} doesn't equals to \
current_node_ip: {} from paddlecloud environment."
                    .format(args.node_ip, current_node_ip))
            if args.cluster_node_ips != "127.0.0.1" and args.cluster_node_ips != ",".join(
                    node_ips):
176
                logger.warning(
177 178 179 180
                    "Please NOTE: When using paddlecloud, cluster_node_ips is \
automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
Your input cluster_node_ips: {} doesn't equals to IPs: {} from \
paddlecloud environment.".format(args.cluster_node_ips, node_ips))
G
gongweibao 已提交
181 182 183 184 185 186
    num_nodes = len(node_ips)

    if args.selected_gpus is None:
        gpus_num = fluid.core.get_cuda_device_count()
        selected_gpus = [str(x) for x in range(0, gpus_num)]
    else:
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
        cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
        if cuda_visible_devices is None or cuda_visible_devices == "":
            selected_gpus = [x.strip() for x in args.selected_gpus.split(',')]
        else:
            # change selected_gpus into relative values
            # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
            # therefore selected_gpus=0,1,2,3
            cuda_visible_devices_list = cuda_visible_devices.split(',')
            for x in args.selected_gpus.split(','):
                assert x in cuda_visible_devices_list, "Can't find "\
                "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
                % (x, cuda_visible_devices)
            selected_gpus = [
                cuda_visible_devices_list.index(x.strip())
                for x in args.selected_gpus.split(',')
            ]
G
gongweibao 已提交
203 204
    selected_gpus_num = len(selected_gpus)

205 206 207 208 209 210 211 212 213 214
    if args.use_paddlecloud and num_nodes > 1:
        cloud_paddle_port = os.getenv("PADDLE_PORT", "")
        cloud_paddle_port_num = os.getenv("PADDLE_PORTS_NUM", "")
        if cloud_paddle_port != "" and cloud_paddle_port_num != "":
            cloud_paddle_port_num = int(cloud_paddle_port_num)
            if cloud_paddle_port_num >= selected_gpus_num:
                args.started_port = int(cloud_paddle_port)
                logger.warning("Use Cloud specified port:{}.".format(
                    cloud_paddle_port))

G
gongweibao 已提交
215 216 217 218 219
    trainers_endpoints = ""
    for ip in node_ips:
        for i in range(selected_gpus_num):
            if trainers_endpoints != "":
                trainers_endpoints += ","
220
            trainers_endpoints += "%s:%d" % (ip, args.started_port + i)
G
gongweibao 已提交
221 222 223 224 225 226 227 228 229

    nranks = num_nodes * selected_gpus_num

    if args.print_config:
        print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id,
              ", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes,
              ", node_ips:", node_ips, ", nranks:", nranks)

    current_env = copy.copy(default_env)
230 231 232 233
    #paddle broadcast ncclUniqueId use socket, and
    #proxy maybe make trainers unreachable, so delete them.
    #if we set them to "", grpc will log error message "bad uri"
    #so just delete them.
G
gongweibao 已提交
234 235 236
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

G
gongweibao 已提交
237
    procs = []
238
    log_fns = []
G
gongweibao 已提交
239
    cmds = []
240
    ranks = []
G
gongweibao 已提交
241
    for i in range(0, selected_gpus_num):
242
        rank = (node_id * selected_gpus_num + i)
G
gongweibao 已提交
243 244
        current_env.update({
            "FLAGS_selected_gpus": "%s" % selected_gpus[i],
245
            "PADDLE_TRAINER_ID": "%d" % rank,
G
gongweibao 已提交
246 247 248
            "PADDLE_CURRENT_ENDPOINT":
            "%s:%d" % (current_node_ip, args.started_port + i),
            "PADDLE_TRAINERS_NUM": "%d" % nranks,
G
gongweibao 已提交
249
            "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints
G
gongweibao 已提交
250 251 252 253 254 255 256
        })

        cmd = [sys.executable, "-u", args.training_script
               ] + args.training_script_args
        cmds.append(cmd)

        if args.log_dir is not None:
257
            os.system("mkdir -p {}".format(args.log_dir))
G
gongweibao 已提交
258 259 260 261 262 263 264
            fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
            log_fns.append(fn)
            proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
        else:
            proc = subprocess.Popen(cmd, env=current_env)

        procs.append(proc)
265
        ranks.append(rank)
G
gongweibao 已提交
266

267 268 269
    try:
        alive = True
        error = False
270
        error_rank = []
271 272 273
        # wait all process finish or one error
        while alive and not error:
            alive = False
274
            for rank, p in zip(ranks, procs):
275 276 277 278 279
                ret = p.poll()
                if ret is None:
                    alive = True
                elif ret != 0:
                    error = True
280
                    error_rank.append(rank)
281 282 283 284 285 286 287 288 289 290 291
            time.sleep(1)

        if error:
            terminate_procs(procs)
            exit(1)

    except KeyboardInterrupt:
        logger.warning("KeyboardInterrupt, exit")
        terminate_procs(procs)
        raise
    except SystemExit:
292 293 294
        logger.error(
            "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.".
            format(nranks, error_rank))
295 296 297
        terminate_procs(procs)
        raise
    except:
298 299 300
        logger.error(
            "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.".
            format(nranks, error_rank))
301 302 303 304 305
        terminate_procs(procs)
        raise
    finally:
        for fn in log_fns:
            fn.close()
G
gongweibao 已提交
306

307

G
gongweibao 已提交
308 309 310 311 312
def launch():
    args = _parse_args()
    if args.print_config:
        _print_arguments(args)
    start_procs(args)
W
Wu Yi 已提交
313 314 315


if __name__ == "__main__":
G
gongweibao 已提交
316
    launch()