提交 ad3547c0 编写于 作者: C chengduozh

add multi process implementation

上级 3a543ca8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import paddle.fluid as fluid
def nccl2_prepare(trainer_id, startup_prog, main_prog):
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(trainer_id,
trainers=os.environ.get('PADDLE_TRAINER_ENDPOINTS'),
current_endpoint=os.environ.get('PADDLE_CURRENT_ENDPOINT'),
startup_program=startup_prog,
program=main_prog)
def prepare_for_multi_process(exe, build_strategy, train_prog, startup_prog):
# prepare for multi-process
trainer_id = int(os.environ.get('PADDLE_TRAINER_ID', 0))
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
print("PADDLE_TRAINERS_NUM", num_trainers)
print("PADDLE_TRAINER_ID", trainer_id)
build_strategy.num_trainers = num_trainers
build_strategy.trainer_id = trainer_id
# NOTE(zcd): use multi processes to train the model,
# and each process use one GPU card.
if num_trainers > 1:
nccl2_prepare(trainer_id,
startup_prog, train_prog)
# the startup_prog are run two times, but it doesn't matter.
exe.run(startup_prog)
...@@ -32,6 +32,8 @@ from model.classifier import create_model ...@@ -32,6 +32,8 @@ from model.classifier import create_model
from optimization import optimization from optimization import optimization
from utils.args import ArgumentGroup, print_arguments from utils.args import ArgumentGroup, print_arguments
from utils.init import init_pretraining_params, init_checkpoint from utils.init import init_pretraining_params, init_checkpoint
import dist_utils
# yapf: disable # yapf: disable
parser = argparse.ArgumentParser(__doc__) parser = argparse.ArgumentParser(__doc__)
...@@ -107,6 +109,21 @@ def evaluate(exe, test_program, test_pyreader, fetch_list, eval_phase): ...@@ -107,6 +109,21 @@ def evaluate(exe, test_program, test_pyreader, fetch_list, eval_phase):
(eval_phase, np.sum(total_cost) / np.sum(total_num_seqs), (eval_phase, np.sum(total_cost) / np.sum(total_num_seqs),
np.sum(total_acc) / np.sum(total_num_seqs), time_end - time_begin)) np.sum(total_acc) / np.sum(total_num_seqs), time_end - time_begin))
def get_device_num():
visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
# NOTE(zcd): use multi processes to train the model,
# and each process use one GPU card.
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
if num_trainers > 1 : return 1
if visible_device:
device_num = len(visible_device.split(','))
else:
device_num = subprocess.check_output(['nvidia-smi','-L']).decode().count('\n')
return device_num
def update_lr(args):
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
args.learning_rate = args.learning_rate / num_trainers
def main(args): def main(args):
bert_config = BertConfig(args.bert_config_path) bert_config = BertConfig(args.bert_config_path)
...@@ -114,12 +131,14 @@ def main(args): ...@@ -114,12 +131,14 @@ def main(args):
if args.use_cuda: if args.use_cuda:
place = fluid.CUDAPlace(int(os.getenv('FLAGS_selected_gpus', '0'))) place = fluid.CUDAPlace(int(os.getenv('FLAGS_selected_gpus', '0')))
dev_count = fluid.core.get_cuda_device_count() dev_count = get_device_num() # fluid.core.get_cuda_device_count()
else: else:
place = fluid.CPUPlace() place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exe = fluid.Executor(place) exe = fluid.Executor(place)
update_lr(args)
task_name = args.task_name.lower() task_name = args.task_name.lower()
processors = { processors = {
'xnli': reader.XnliProcessor, 'xnli': reader.XnliProcessor,
...@@ -251,6 +270,8 @@ def main(args): ...@@ -251,6 +270,8 @@ def main(args):
exec_strategy.num_iteration_per_drop_scope = args.num_iteration_per_drop_scope exec_strategy.num_iteration_per_drop_scope = args.num_iteration_per_drop_scope
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
dist_utils.prepare_for_multi_process(exe, build_strategy, train_program, startup_prog)
train_exe = fluid.ParallelExecutor( train_exe = fluid.ParallelExecutor(
use_cuda=args.use_cuda, use_cuda=args.use_cuda,
loss_name=loss.name, loss_name=loss.name,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册