From 6a1df4699127092e7e1c23f0fcedc984c4e08e61 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Thu, 6 Jun 2019 07:08:23 +0800 Subject: [PATCH] Fine tuning launch.py (#17223) --- python/paddle/distributed/launch.py | 273 +++++++++++------- .../fluid/tests/unittests/CMakeLists.txt | 26 ++ .../fluid/tests/unittests/multi_process.py | 35 +++ .../fluid/tests/unittests/test_launch.sh | 30 ++ 4 files changed, 260 insertions(+), 104 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/multi_process.py create mode 100644 python/paddle/fluid/tests/unittests/test_launch.sh diff --git a/python/paddle/distributed/launch.py b/python/paddle/distributed/launch.py index 8374faf0863..e7b6dfa6d60 100644 --- a/python/paddle/distributed/launch.py +++ b/python/paddle/distributed/launch.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,100 +11,58 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +paddle.distributed.launch is a module that spawns multiple distributed +process on each trainning node for gpu trainning. + +Usage: + In both of single node training or multiple node training, this module +launch a process on each of the given gpu card. + + 1. for single node trainning with all visible gpu cards: + python -m paddle.distributed.launch \ + your_training_py (arg1 arg2 and all others) + + 2. for single node trainning with [0,4) cards + python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \ + your_training_py (arg1 arg2 and all others) + + 3. for mulitple node training such as two node:192.168.0.16, 192.168.0.17 + on 192.168.0.16: + python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \ + --node_ip=192.168.0.16 \ + your_training_py (arg1 arg2 and all others) + + on 192.168.0.17: + python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \ + --node_ip=192.168.0.17 \ + your_training_py (arg1 arg2 and all others) +""" from __future__ import print_function - +import sys +from sys import version import subprocess import os -import sys -import time -import argparse - -default_envs = { - "PADDLE_TRAINER_ENDPOINTS": - "127.0.0.1:6170,127.0.0.1:6171,127.0.0.1:6172,127.0.0.1:6173,127.0.0.1:6174,127.0.0.1:6175,127.0.0.1:6176,127.0.0.1:6177", - "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), - "PATH": os.getenv("PATH"), - "LD_PRELOAD": os.getenv("LD_PRELOAD", ""), - "PADDLE_TRAINERS_NUM": "8", - "NCCL_DEBUG": "INFO", - "GLOG_v": "0", - "NCCL_SOCKET_IFNAME": "eth0", - "NCCL_IB_GID_INDEX": "3", - "NCCL_IB_RETRY_CNT": "0", - "PYTHONPATH": os.getenv("PYTHONPATH", ""), -} - -GPUS = 8 - - -def get_gpu_ids(gpus): - if os.getenv("CUDA_VISIBLE_DEVICES"): - ids = [int(i) - for i in os.getenv("CUDA_VISIBLE_DEVICES").split(",")][:gpus] - if gpus > len(ids): - raise EnvironmentError( - "The count of env CUDA_VISIBLE_DEVICES should not greater than the passed gpus: %s" - % gpus) - return ids - else: - return [i for i in range(gpus)] +import six +import copy +from argparse import ArgumentParser, REMAINDER +import paddle.fluid as fluid -def start_procs(gpus, entrypoint, entrypoint_args, log_dir): - procs = [] - log_fns = [] - os.system("mkdir -p %s" % log_dir) - # ======== update parent envs ======= - for k, v in os.environ.items(): - if k.startswith("FLAGS_") or k.startswith("NCCL_") or \ - k.startswith("GLOG_"): - default_envs[k] = v - - # ======== for dist training ======= - node_trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - current_ip = os.getenv("POD_IP", "127.0.0.1") - trainer_ips = os.getenv("PADDLE_TRAINERS", current_ip).split(",") - num_nodes = len(trainer_ips) - all_nodes_devices_endpoints = "" - for n in trainer_ips: - for i in range(gpus): - if all_nodes_devices_endpoints: - all_nodes_devices_endpoints += "," - all_nodes_devices_endpoints += "%s:617%d" % (n, i) - nranks = num_nodes * gpus - # ======== for dist training ======= - gpu_ids = get_gpu_ids(gpus) - for i in range(gpus): - curr_env = {} - curr_env.update(default_envs) - curr_env.update({ - "FLAGS_selected_gpus": "%d" % gpu_ids[i], - "PADDLE_TRAINER_ID": "%d" % (node_trainer_id * gpus + i), - "PADDLE_CURRENT_ENDPOINT": "%s:617%d" % (current_ip, i), - # nranks - "PADDLE_TRAINERS_NUM": "%d" % nranks, - "PADDLE_TRAINER_ENDPOINTS": all_nodes_devices_endpoints - }) - - print("starting process ", i, entrypoint, entrypoint_args, curr_env) - fn = open("%s/workerlog.%d" % (log_dir, i), "w") - log_fns.append(fn) - cmd = [sys.executable, "-u", entrypoint] + entrypoint_args - procs.append(subprocess.Popen(cmd, stdout=fn, stderr=fn, env=curr_env)) - - for i in range(gpus): - try: - procs[i].communicate() - procs[i].terminate() - log_fns[i].close() - except: - pass - +def _print_arguments(args): + print("----------- Configuration Arguments -----------") + for arg, value in sorted(six.iteritems(vars(args))): + print("%s: %s" % (arg, value)) + print("------------------------------------------------") -def parse_args(): - parser = argparse.ArgumentParser( +def _parse_args(): + """ + Helper function parsing the command line options + @retval ArgumentParser + """ + parser = ArgumentParser( description='''start paddle training using multi-process mode. NOTE: your train program ***must*** run as distributed nccl2 mode, see: http://www.paddlepaddle.org/documentation/docs/zh/1.2/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2- @@ -117,33 +75,140 @@ PADDLE_TRAINERS_NUM PADDLE_TRAINER_ENDPOINTS POD_IP (current node ip address, not needed for local training) ''') + + # Optional arguments for the launch helper parser.add_argument( - '--gpus', + "--cluster_node_ips", + type=str, + default="127.0.0.1", + help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..") + + parser.add_argument( + "--node_ip", + type=str, + default="127.0.0.1", + help="The current node ip. ") + + parser.add_argument( + "--started_port", type=int, - default=8, - help='start number of processes for every gpu') + default=6170, + help="The trainer's started port on a single node") + + parser.add_argument( + "--print_config", + type=bool, + default=True, + help="Print the config or not") + + parser.add_argument( + "--selected_gpus", + type=str, + default=None, + help="It's for gpu trainning and the trainning process will run on the selected_gpus," + "each process is bound to a single GPU. And if it's not setted, this module will use all the gpu cards for training." + ) + parser.add_argument( - '--log_dir', + "--log_dir", type=str, - default="mylog", - help='directory to put logs per process.') + help="The path for each process's log.If it's not setted, the log will printed to default pipe." + ) + + # positional parser.add_argument( - 'entrypoint_script', + "training_script", type=str, - help="The entrypoint script to be launched in parallel," - "followed by all the arguments for each process," - "e.g. train.py --lr 0.1") - parser.add_argument('entrypoint_args', nargs=argparse.REMAINDER) + help="The full path to the single GPU training " + "program/script to be launched in parallel, " + "followed by all the arguments for the " + "training script") + + # rest from the training program + parser.add_argument('training_script_args', nargs=REMAINDER) return parser.parse_args() -def main(): - args = parse_args() +def start_procs(args): + """ + """ + procs = [] + log_fns = [] + + default_env = os.environ.copy() + + current_node_ip = args.node_ip + node_ips = [x.strip() for x in args.cluster_node_ips.split(',')] + node_id = node_ips.index(current_node_ip) + num_nodes = len(node_ips) + + if args.selected_gpus is None: + gpus_num = fluid.core.get_cuda_device_count() + selected_gpus = [str(x) for x in range(0, gpus_num)] + else: + selected_gpus = [x.strip() for x in args.selected_gpus.split(',')] + selected_gpus_num = len(selected_gpus) + + trainers_endpoints = "" + for ip in node_ips: + for i in range(selected_gpus_num): + if trainers_endpoints != "": + trainers_endpoints += "," + trainers_endpoints += "%s:617%d" % (ip, i) + + nranks = num_nodes * selected_gpus_num + + if args.print_config: + print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id, + ", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes, + ", node_ips:", node_ips, ", nranks:", nranks) + + current_env = copy.copy(default_env) + procs = [] + cmds = [] + for i in range(0, selected_gpus_num): + current_env.update({ + "FLAGS_selected_gpus": "%s" % selected_gpus[i], + "PADDLE_TRAINER_ID": "%d" % (node_id * selected_gpus_num + i), + "PADDLE_CURRENT_ENDPOINT": + "%s:%d" % (current_node_ip, args.started_port + i), + "PADDLE_TRAINERS_NUM": "%d" % nranks, + "PADDLE_TRAINER_ENDPOINTS": trainers_endpoints + }) + + cmd = [sys.executable, "-u", args.training_script + ] + args.training_script_args + + cmds.append(cmd) + + if args.log_dir is not None: + fn = open("%s/workerlog.%d" % (args.log_dir, i), "w") + log_fns.append(fn) + + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + procs.append(proc) + + for i in range(0, len(procs)): + proc = procs[i] + + proc.wait() + if len(log_fns) > 0: + log_fns[i].close() + + if proc.returncode != 0: + raise subprocess.CalledProcessError( + returncode=procs[i].returncode, cmd=cmds[i]) + - # launch multiple training process - start_procs(args.gpus, args.entrypoint_script, args.entrypoint_args, - args.log_dir) +def launch(): + args = _parse_args() + if args.print_config: + _print_arguments(args) + start_procs(args) if __name__ == "__main__": - main() + launch() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index f6672d3afbe..5d43bc9d7fb 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -19,6 +19,8 @@ if(NOT WITH_DISTRIBUTE) LIST(REMOVE_ITEM TEST_OPS test_hsigmoid_remote_table_op) endif(NOT WITH_DISTRIBUTE) +LIST(REMOVE_ITEM TEST_OPS test_launch) + if (NOT ${WITH_GPU}) LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op) LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_mnist) # TODO(Yancey1989): parallel dygraph support CPU device in future @@ -66,6 +68,29 @@ function(py_test_modules TARGET_NAME) set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT 350) endif() endfunction() + +function(bash_test_modules TARGET_NAME) + if(NOT WITH_TESTING) + return() + endif() + + set(options SERIAL) + set(oneValueArgs "") + set(multiValueArgs MODULES DEPS ENVS) + cmake_parse_arguments(bash_test_modules "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + message(STATUS "CMAKE_CURRENT_BINARY_DIR:" ${CMAKE_CURRENT_BINARY_DIR}) + + add_test(NAME ${TARGET_NAME} + COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python ${bash_test_modules_ENVS} + bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + if (bash_test_modules_SERIAL) + set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1) + endif() + set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT 600) +endfunction() + list(REMOVE_ITEM TEST_OPS test_warpctc_op) list(REMOVE_ITEM TEST_OPS test_dist_train) list(REMOVE_ITEM TEST_OPS test_dist_transpiler) @@ -154,6 +179,7 @@ if(WITH_DISTRIBUTE) set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200) py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext) py_test_modules(test_dist_se_resnext_nccl MODULES test_dist_se_resnext_nccl) + bash_test_modules(test_launch MODULES test_launch.sh) # FIXME(typhoonzero): add these tests back # py_test_modules(test_dist_transformer MODULES test_dist_transformer) # set_tests_properties(test_dist_transformer PROPERTIES TIMEOUT 1000) diff --git a/python/paddle/fluid/tests/unittests/multi_process.py b/python/paddle/fluid/tests/unittests/multi_process.py new file mode 100644 index 00000000000..176439626fe --- /dev/null +++ b/python/paddle/fluid/tests/unittests/multi_process.py @@ -0,0 +1,35 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + + +def train(): + selected_gpus = os.getenv("FLAGS_selected_gpus") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS") + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + worker_endpoints = worker_endpoints_env.split(",") + trainers_num = len(worker_endpoints) + + name = "selected_gpus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\ + .format(selected_gpus, worker_endpoints, trainers_num, current_endpoint,trainer_id) + + print(name) + with open("multi_process.check.log", "w") as f: + f.write(name) + + +if __name__ == '__main__': + train() diff --git a/python/paddle/fluid/tests/unittests/test_launch.sh b/python/paddle/fluid/tests/unittests/test_launch.sh new file mode 100644 index 00000000000..7b849d022d2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_launch.sh @@ -0,0 +1,30 @@ +#!/bin/bash +set -e + +# use default values +python -m paddle.distributed.launch multi_process.py + +# use specified values +cluster_node_ips="127.0.0.1" +node_ip="127.0.0.1" + +distributed_args="--cluster_node_ips ${cluster_node_ips} --node_ip ${node_ip} --selected_gpus=0,1" +python -m paddle.distributed.launch ${distributed_args} multi_process.py + +str1="selected_gpus:0 worker_endpoints:['127.0.0.1:6170', '127.0.0.1:6171'] trainers_num:2 current_endpoint:127.0.0.1:6170 trainer_id:0" +str2="selected_gpus:1 worker_endpoints:['127.0.0.1:6170', '127.0.0.1:6171'] trainers_num:2 current_endpoint:127.0.0.1:6171 trainer_id:1" +file="multi_process.check.log" + +if ! grep -q "$str1" "$file"; then + echo "find trainer 0" +else + echo "not find trainer 0" + exit -1 +fi + +if ! grep -q "$str2" "$file"; then + echo "find trainer 1" +else + echo "not find trainer 0" + exit -1 +fi -- GitLab