未验证 提交 6a1df469 编写于 作者: G gongweibao 提交者: GitHub

Fine tuning launch.py (#17223)

上级 841553e1
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......@@ -11,100 +11,58 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
paddle.distributed.launch is a module that spawns multiple distributed
process on each trainning node for gpu trainning.
Usage:
In both of single node training or multiple node training, this module
launch a process on each of the given gpu card.
1. for single node trainning with all visible gpu cards:
python -m paddle.distributed.launch \
your_training_py (arg1 arg2 and all others)
2. for single node trainning with [0,4) cards
python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \
your_training_py (arg1 arg2 and all others)
3. for mulitple node training such as two node:192.168.0.16, 192.168.0.17
on 192.168.0.16:
python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
--node_ip=192.168.0.16 \
your_training_py (arg1 arg2 and all others)
on 192.168.0.17:
python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
--node_ip=192.168.0.17 \
your_training_py (arg1 arg2 and all others)
"""
from __future__ import print_function
import sys
from sys import version
import subprocess
import os
import sys
import time
import argparse
default_envs = {
"PADDLE_TRAINER_ENDPOINTS":
"127.0.0.1:6170,127.0.0.1:6171,127.0.0.1:6172,127.0.0.1:6173,127.0.0.1:6174,127.0.0.1:6175,127.0.0.1:6176,127.0.0.1:6177",
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"PATH": os.getenv("PATH"),
"LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
"PADDLE_TRAINERS_NUM": "8",
"NCCL_DEBUG": "INFO",
"GLOG_v": "0",
"NCCL_SOCKET_IFNAME": "eth0",
"NCCL_IB_GID_INDEX": "3",
"NCCL_IB_RETRY_CNT": "0",
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
}
GPUS = 8
def get_gpu_ids(gpus):
if os.getenv("CUDA_VISIBLE_DEVICES"):
ids = [int(i)
for i in os.getenv("CUDA_VISIBLE_DEVICES").split(",")][:gpus]
if gpus > len(ids):
raise EnvironmentError(
"The count of env CUDA_VISIBLE_DEVICES should not greater than the passed gpus: %s"
% gpus)
return ids
else:
return [i for i in range(gpus)]
def start_procs(gpus, entrypoint, entrypoint_args, log_dir):
procs = []
log_fns = []
os.system("mkdir -p %s" % log_dir)
# ======== update parent envs =======
for k, v in os.environ.items():
if k.startswith("FLAGS_") or k.startswith("NCCL_") or \
k.startswith("GLOG_"):
default_envs[k] = v
# ======== for dist training =======
node_trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
current_ip = os.getenv("POD_IP", "127.0.0.1")
trainer_ips = os.getenv("PADDLE_TRAINERS", current_ip).split(",")
num_nodes = len(trainer_ips)
all_nodes_devices_endpoints = ""
for n in trainer_ips:
for i in range(gpus):
if all_nodes_devices_endpoints:
all_nodes_devices_endpoints += ","
all_nodes_devices_endpoints += "%s:617%d" % (n, i)
nranks = num_nodes * gpus
# ======== for dist training =======
gpu_ids = get_gpu_ids(gpus)
for i in range(gpus):
curr_env = {}
curr_env.update(default_envs)
curr_env.update({
"FLAGS_selected_gpus": "%d" % gpu_ids[i],
"PADDLE_TRAINER_ID": "%d" % (node_trainer_id * gpus + i),
"PADDLE_CURRENT_ENDPOINT": "%s:617%d" % (current_ip, i),
# nranks
"PADDLE_TRAINERS_NUM": "%d" % nranks,
"PADDLE_TRAINER_ENDPOINTS": all_nodes_devices_endpoints
})
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle.fluid as fluid
print("starting process ", i, entrypoint, entrypoint_args, curr_env)
fn = open("%s/workerlog.%d" % (log_dir, i), "w")
log_fns.append(fn)
cmd = [sys.executable, "-u", entrypoint] + entrypoint_args
procs.append(subprocess.Popen(cmd, stdout=fn, stderr=fn, env=curr_env))
for i in range(gpus):
try:
procs[i].communicate()
procs[i].terminate()
log_fns[i].close()
except:
pass
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
def parse_args():
parser = argparse.ArgumentParser(
def _parse_args():
"""
Helper function parsing the command line options
@retval ArgumentParser
"""
parser = ArgumentParser(
description='''start paddle training using multi-process mode.
NOTE: your train program ***must*** run as distributed nccl2 mode,
see: http://www.paddlepaddle.org/documentation/docs/zh/1.2/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
......@@ -117,33 +75,140 @@ PADDLE_TRAINERS_NUM
PADDLE_TRAINER_ENDPOINTS
POD_IP (current node ip address, not needed for local training)
''')
# Optional arguments for the launch helper
parser.add_argument(
'--gpus',
"--cluster_node_ips",
type=str,
default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
parser.add_argument(
"--node_ip",
type=str,
default="127.0.0.1",
help="The current node ip. ")
parser.add_argument(
"--started_port",
type=int,
default=8,
help='start number of processes for every gpu')
default=6170,
help="The trainer's started port on a single node")
parser.add_argument(
'--log_dir',
"--print_config",
type=bool,
default=True,
help="Print the config or not")
parser.add_argument(
"--selected_gpus",
type=str,
default=None,
help="It's for gpu trainning and the trainning process will run on the selected_gpus,"
"each process is bound to a single GPU. And if it's not setted, this module will use all the gpu cards for training."
)
parser.add_argument(
"--log_dir",
type=str,
default="mylog",
help='directory to put logs per process.')
help="The path for each process's log.If it's not setted, the log will printed to default pipe."
)
# positional
parser.add_argument(
'entrypoint_script',
"training_script",
type=str,
help="The entrypoint script to be launched in parallel,"
"followed by all the arguments for each process,"
"e.g. train.py --lr 0.1")
parser.add_argument('entrypoint_args', nargs=argparse.REMAINDER)
help="The full path to the single GPU training "
"program/script to be launched in parallel, "
"followed by all the arguments for the "
"training script")
# rest from the training program
parser.add_argument('training_script_args', nargs=REMAINDER)
return parser.parse_args()
def main():
args = parse_args()
def start_procs(args):
"""
"""
procs = []
log_fns = []
default_env = os.environ.copy()
current_node_ip = args.node_ip
node_ips = [x.strip() for x in args.cluster_node_ips.split(',')]
node_id = node_ips.index(current_node_ip)
num_nodes = len(node_ips)
if args.selected_gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
selected_gpus = [str(x) for x in range(0, gpus_num)]
else:
selected_gpus = [x.strip() for x in args.selected_gpus.split(',')]
selected_gpus_num = len(selected_gpus)
trainers_endpoints = ""
for ip in node_ips:
for i in range(selected_gpus_num):
if trainers_endpoints != "":
trainers_endpoints += ","
trainers_endpoints += "%s:617%d" % (ip, i)
nranks = num_nodes * selected_gpus_num
if args.print_config:
print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id,
", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes,
", node_ips:", node_ips, ", nranks:", nranks)
current_env = copy.copy(default_env)
procs = []
cmds = []
for i in range(0, selected_gpus_num):
current_env.update({
"FLAGS_selected_gpus": "%s" % selected_gpus[i],
"PADDLE_TRAINER_ID": "%d" % (node_id * selected_gpus_num + i),
"PADDLE_CURRENT_ENDPOINT":
"%s:%d" % (current_node_ip, args.started_port + i),
"PADDLE_TRAINERS_NUM": "%d" % nranks,
"PADDLE_TRAINER_ENDPOINTS": trainers_endpoints
})
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if args.log_dir is not None:
fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
procs.append(proc)
for i in range(0, len(procs)):
proc = procs[i]
proc.wait()
if len(log_fns) > 0:
log_fns[i].close()
if proc.returncode != 0:
raise subprocess.CalledProcessError(
returncode=procs[i].returncode, cmd=cmds[i])
# launch multiple training process
start_procs(args.gpus, args.entrypoint_script, args.entrypoint_args,
args.log_dir)
def launch():
args = _parse_args()
if args.print_config:
_print_arguments(args)
start_procs(args)
if __name__ == "__main__":
main()
launch()
......@@ -19,6 +19,8 @@ if(NOT WITH_DISTRIBUTE)
LIST(REMOVE_ITEM TEST_OPS test_hsigmoid_remote_table_op)
endif(NOT WITH_DISTRIBUTE)
LIST(REMOVE_ITEM TEST_OPS test_launch)
if (NOT ${WITH_GPU})
LIST(REMOVE_ITEM TEST_OPS test_conv2d_fusion_op)
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_mnist) # TODO(Yancey1989): parallel dygraph support CPU device in future
......@@ -66,6 +68,29 @@ function(py_test_modules TARGET_NAME)
set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT 350)
endif()
endfunction()
function(bash_test_modules TARGET_NAME)
if(NOT WITH_TESTING)
return()
endif()
set(options SERIAL)
set(oneValueArgs "")
set(multiValueArgs MODULES DEPS ENVS)
cmake_parse_arguments(bash_test_modules "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
message(STATUS "CMAKE_CURRENT_BINARY_DIR:" ${CMAKE_CURRENT_BINARY_DIR})
add_test(NAME ${TARGET_NAME}
COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python ${bash_test_modules_ENVS}
bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if (bash_test_modules_SERIAL)
set_property(TEST ${TARGET_NAME} PROPERTY RUN_SERIAL 1)
endif()
set_tests_properties(${TARGET_NAME} PROPERTIES TIMEOUT 600)
endfunction()
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_dist_train)
list(REMOVE_ITEM TEST_OPS test_dist_transpiler)
......@@ -154,6 +179,7 @@ if(WITH_DISTRIBUTE)
set_tests_properties(test_dist_word2vec PROPERTIES TIMEOUT 200)
py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext)
py_test_modules(test_dist_se_resnext_nccl MODULES test_dist_se_resnext_nccl)
bash_test_modules(test_launch MODULES test_launch.sh)
# FIXME(typhoonzero): add these tests back
# py_test_modules(test_dist_transformer MODULES test_dist_transformer)
# set_tests_properties(test_dist_transformer PROPERTIES TIMEOUT 1000)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
def train():
selected_gpus = os.getenv("FLAGS_selected_gpus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env.split(",")
trainers_num = len(worker_endpoints)
name = "selected_gpus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}"\
.format(selected_gpus, worker_endpoints, trainers_num, current_endpoint,trainer_id)
print(name)
with open("multi_process.check.log", "w") as f:
f.write(name)
if __name__ == '__main__':
train()
#!/bin/bash
set -e
# use default values
python -m paddle.distributed.launch multi_process.py
# use specified values
cluster_node_ips="127.0.0.1"
node_ip="127.0.0.1"
distributed_args="--cluster_node_ips ${cluster_node_ips} --node_ip ${node_ip} --selected_gpus=0,1"
python -m paddle.distributed.launch ${distributed_args} multi_process.py
str1="selected_gpus:0 worker_endpoints:['127.0.0.1:6170', '127.0.0.1:6171'] trainers_num:2 current_endpoint:127.0.0.1:6170 trainer_id:0"
str2="selected_gpus:1 worker_endpoints:['127.0.0.1:6170', '127.0.0.1:6171'] trainers_num:2 current_endpoint:127.0.0.1:6171 trainer_id:1"
file="multi_process.check.log"
if ! grep -q "$str1" "$file"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if ! grep -q "$str2" "$file"; then
echo "find trainer 1"
else
echo "not find trainer 0"
exit -1
fi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册