launch.py 12.5 KB
Newer Older
G
gongweibao 已提交
1
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
W
Wu Yi 已提交
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
G
gongweibao 已提交
14 15
"""
paddle.distributed.launch is a module that spawns multiple distributed 
T
tianshuo78520a 已提交
16
process on each training node for gpu training.
G
gongweibao 已提交
17 18 19
Usage:
    In both of single node training or multiple node training, this module 
launch a process on each of the given gpu card.
T
tianshuo78520a 已提交
20
    1. for single node training with all visible gpu cards:
G
gongweibao 已提交
21 22 23
       python -m paddle.distributed.launch \
         your_training_py (arg1 arg2 and all others)
    
T
tianshuo78520a 已提交
24
    2. for single node training with [0,4) cards
G
gongweibao 已提交
25 26
       python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \
         your_training_py (arg1 arg2 and all others)
T
tianshuo78520a 已提交
27
    3. for multiple node training such as two node:192.168.0.16, 192.168.0.17
G
gongweibao 已提交
28 29 30 31 32 33 34 35 36
        on 192.168.0.16:
            python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
                --node_ip=192.168.0.16 \
                your_training_py (arg1 arg2 and all others)
        on 192.168.0.17:
            python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
                --node_ip=192.168.0.17 \
                your_training_py (arg1 arg2 and all others)
"""
W
Wu Yi 已提交
37 38

from __future__ import print_function
39
import logging
G
gongweibao 已提交
40 41
import sys
from sys import version
W
Wu Yi 已提交
42 43
import subprocess
import os
44
import time
G
gongweibao 已提交
45 46 47 48
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle.fluid as fluid
49 50
from contextlib import closing
import socket
51

52 53 54 55
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_handler = logging.StreamHandler()
log_format = logging.Formatter(
56
    '%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s')
57 58 59
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)

60

G
gongweibao 已提交
61 62 63 64 65
def _print_arguments(args):
    print("-----------  Configuration Arguments -----------")
    for arg, value in sorted(six.iteritems(vars(args))):
        print("%s: %s" % (arg, value))
    print("------------------------------------------------")
W
Wu Yi 已提交
66

67

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
def find_free_ports(num):
    def __free_port():
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
            s.bind(('', 0))
            return s.getsockname()[1]

    port_set = set()
    step = 0
    while True:
        port = __free_port()
        if port not in port_set:
            port_set.add(port)

        if len(port_set) >= num:
            return port_set

        step += 1
        if step > 100:
            print(
                "can't find avilable port and use the specified static port now!"
            )
            return None

    return None


G
gongweibao 已提交
94 95 96 97 98 99
def _parse_args():
    """
    Helper function parsing the command line options
    @retval ArgumentParser
    """
    parser = ArgumentParser(
W
Wu Yi 已提交
100 101
        description='''start paddle training using multi-process mode.
NOTE: your train program ***must*** run as distributed nccl2 mode,
102
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
W
Wu Yi 已提交
103 104 105 106 107 108 109 110 111
And your train program must read environment variables below in order to let different
process init properly:
FLAGS_selected_gpus
PADDLE_TRAINER_ID
PADDLE_CURRENT_ENDPOINT
PADDLE_TRAINERS_NUM
PADDLE_TRAINER_ENDPOINTS
POD_IP (current node ip address, not needed for local training)
''')
G
gongweibao 已提交
112

113
    #Optional arguments for the launch helper
W
Wu Yi 已提交
114
    parser.add_argument(
G
gongweibao 已提交
115 116 117 118 119 120 121 122 123
        "--cluster_node_ips",
        type=str,
        default="127.0.0.1",
        help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
    parser.add_argument(
        "--node_ip",
        type=str,
        default="127.0.0.1",
        help="The current node ip. ")
124 125
    parser.add_argument(
        "--use_paddlecloud",
126 127 128
        action='store_true',
        help="wheter to use paddlecloud platform to run your multi-process job. If false, no need to set this argument."
    )
G
gongweibao 已提交
129 130
    parser.add_argument(
        "--started_port",
W
Wu Yi 已提交
131
        type=int,
132
        default=None,
G
gongweibao 已提交
133 134 135 136 137 138 139 140 141 142 143 144
        help="The trainer's started port on a single node")

    parser.add_argument(
        "--print_config",
        type=bool,
        default=True,
        help="Print the config or not")

    parser.add_argument(
        "--selected_gpus",
        type=str,
        default=None,
T
tianshuo78520a 已提交
145 146
        help="It's for gpu training and the training process will run on the selected_gpus,"
        "each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
G
gongweibao 已提交
147 148
    )

W
Wu Yi 已提交
149
    parser.add_argument(
G
gongweibao 已提交
150
        "--log_dir",
W
Wu Yi 已提交
151
        type=str,
T
tianshuo78520a 已提交
152
        help="The path for each process's log.If it's not set, the log will printed to default pipe."
G
gongweibao 已提交
153 154
    )

155
    #positional
156
    parser.add_argument(
G
gongweibao 已提交
157
        "training_script",
158
        type=str,
G
gongweibao 已提交
159 160 161 162 163
        help="The full path to the single GPU training "
        "program/script to be launched in parallel, "
        "followed by all the arguments for the "
        "training script")

164
    #rest from the training program
G
gongweibao 已提交
165
    parser.add_argument('training_script_args', nargs=REMAINDER)
166 167 168
    return parser.parse_args()


169 170 171 172 173 174
def terminate_procs(procs):
    for p in procs:
        if p.poll() is None:
            p.terminate()


G
gongweibao 已提交
175 176 177 178 179 180 181 182
def start_procs(args):
    """
    """
    default_env = os.environ.copy()

    current_node_ip = args.node_ip
    node_ips = [x.strip() for x in args.cluster_node_ips.split(',')]
    node_id = node_ips.index(current_node_ip)
183 184 185 186 187 188 189 190 191 192 193 194 195 196
    if args.use_paddlecloud:
        trainer_nums = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
        if trainer_nums != 1:
            #you can automatically get ip info while using paddlecloud multi nodes mode.
            current_node_ip = os.getenv("POD_IP")
            assert current_node_ip is not None, "POD_IP should not be None"
            node_ips = os.getenv("PADDLE_TRAINERS")
            assert node_ips is not None, "PADDLE_TRAINERS should not be None"
            node_ips = node_ips.split(",")
            node_id = os.getenv("PADDLE_TRAINER_ID")
            assert node_id is not None, "PADDLE_TRAINER_ID should not be None"
            node_id = int(node_id)

            if args.node_ip != "127.0.0.1" and current_node_ip != args.node_ip:
197
                logger.warning(
198 199 200 201 202 203
                    "Please NOTE: When using paddlecloud, current_node_ip is \
automatically got from POD_IP. Your input node_ip: {} doesn't equals to \
current_node_ip: {} from paddlecloud environment."
                    .format(args.node_ip, current_node_ip))
            if args.cluster_node_ips != "127.0.0.1" and args.cluster_node_ips != ",".join(
                    node_ips):
204
                logger.warning(
205 206 207 208
                    "Please NOTE: When using paddlecloud, cluster_node_ips is \
automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
Your input cluster_node_ips: {} doesn't equals to IPs: {} from \
paddlecloud environment.".format(args.cluster_node_ips, node_ips))
G
gongweibao 已提交
209 210 211 212 213 214
    num_nodes = len(node_ips)

    if args.selected_gpus is None:
        gpus_num = fluid.core.get_cuda_device_count()
        selected_gpus = [str(x) for x in range(0, gpus_num)]
    else:
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
        cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
        if cuda_visible_devices is None or cuda_visible_devices == "":
            selected_gpus = [x.strip() for x in args.selected_gpus.split(',')]
        else:
            # change selected_gpus into relative values
            # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
            # therefore selected_gpus=0,1,2,3
            cuda_visible_devices_list = cuda_visible_devices.split(',')
            for x in args.selected_gpus.split(','):
                assert x in cuda_visible_devices_list, "Can't find "\
                "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
                % (x, cuda_visible_devices)
            selected_gpus = [
                cuda_visible_devices_list.index(x.strip())
                for x in args.selected_gpus.split(',')
            ]
G
gongweibao 已提交
231 232
    selected_gpus_num = len(selected_gpus)

233 234 235 236 237 238 239 240 241 242
    if args.use_paddlecloud and num_nodes > 1:
        cloud_paddle_port = os.getenv("PADDLE_PORT", "")
        cloud_paddle_port_num = os.getenv("PADDLE_PORTS_NUM", "")
        if cloud_paddle_port != "" and cloud_paddle_port_num != "":
            cloud_paddle_port_num = int(cloud_paddle_port_num)
            if cloud_paddle_port_num >= selected_gpus_num:
                args.started_port = int(cloud_paddle_port)
                logger.warning("Use Cloud specified port:{}.".format(
                    cloud_paddle_port))

243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
    free_ports = None
    if not args.use_paddlecloud and num_nodes <= 1 and args.started_port is None:
        free_ports = find_free_ports(selected_gpus_num)
        if free_ports is not None:
            free_ports = list(free_ports)
            args.started_port = free_ports[0]

    if args.started_port is None:
        args.started_port = 6170

    if free_ports is None:
        free_ports = [
            x
            for x in range(args.started_port, args.started_port +
                           selected_gpus_num)
        ]

G
gongweibao 已提交
260 261
    trainers_endpoints = ""
    for ip in node_ips:
262
        for i in range(0, selected_gpus_num):
G
gongweibao 已提交
263 264
            if trainers_endpoints != "":
                trainers_endpoints += ","
265
            trainers_endpoints += "%s:%d" % (ip, free_ports[i])
G
gongweibao 已提交
266 267 268 269 270 271 272 273 274

    nranks = num_nodes * selected_gpus_num

    if args.print_config:
        print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id,
              ", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes,
              ", node_ips:", node_ips, ", nranks:", nranks)

    current_env = copy.copy(default_env)
275 276 277 278
    #paddle broadcast ncclUniqueId use socket, and
    #proxy maybe make trainers unreachable, so delete them.
    #if we set them to "", grpc will log error message "bad uri"
    #so just delete them.
G
gongweibao 已提交
279 280 281
    current_env.pop("http_proxy", None)
    current_env.pop("https_proxy", None)

G
gongweibao 已提交
282
    procs = []
283
    log_fns = []
G
gongweibao 已提交
284
    cmds = []
285
    ranks = []
G
gongweibao 已提交
286
    for i in range(0, selected_gpus_num):
287
        rank = (node_id * selected_gpus_num + i)
G
gongweibao 已提交
288 289
        current_env.update({
            "FLAGS_selected_gpus": "%s" % selected_gpus[i],
290
            "PADDLE_TRAINER_ID": "%d" % rank,
G
gongweibao 已提交
291
            "PADDLE_CURRENT_ENDPOINT":
292
            "%s:%d" % (current_node_ip, free_ports[i]),
G
gongweibao 已提交
293
            "PADDLE_TRAINERS_NUM": "%d" % nranks,
G
gongweibao 已提交
294
            "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints
G
gongweibao 已提交
295 296 297 298 299 300 301
        })

        cmd = [sys.executable, "-u", args.training_script
               ] + args.training_script_args
        cmds.append(cmd)

        if args.log_dir is not None:
302
            os.system("mkdir -p {}".format(args.log_dir))
G
gongweibao 已提交
303 304 305 306 307 308 309
            fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
            log_fns.append(fn)
            proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
        else:
            proc = subprocess.Popen(cmd, env=current_env)

        procs.append(proc)
310
        ranks.append(rank)
G
gongweibao 已提交
311

312 313 314
    try:
        alive = True
        error = False
315
        error_rank = []
316 317 318
        # wait all process finish or one error
        while alive and not error:
            alive = False
319
            for rank, p in zip(ranks, procs):
320 321 322 323 324
                ret = p.poll()
                if ret is None:
                    alive = True
                elif ret != 0:
                    error = True
325
                    error_rank.append(rank)
326 327 328 329 330 331 332 333 334 335 336
            time.sleep(1)

        if error:
            terminate_procs(procs)
            exit(1)

    except KeyboardInterrupt:
        logger.warning("KeyboardInterrupt, exit")
        terminate_procs(procs)
        raise
    except SystemExit:
337 338 339
        logger.error(
            "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.".
            format(nranks, error_rank))
340 341 342
        terminate_procs(procs)
        raise
    except:
343 344 345
        logger.error(
            "ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.".
            format(nranks, error_rank))
346 347 348 349 350
        terminate_procs(procs)
        raise
    finally:
        for fn in log_fns:
            fn.close()
G
gongweibao 已提交
351

352

G
gongweibao 已提交
353 354 355 356 357
def launch():
    args = _parse_args()
    if args.print_config:
        _print_arguments(args)
    start_procs(args)
W
Wu Yi 已提交
358 359 360


if __name__ == "__main__":
G
gongweibao 已提交
361
    launch()