提交 7803d896 编写于 作者: C chengduozh

support multi-process for bert

上级 37867372
......@@ -28,18 +28,18 @@ def nccl2_prepare(trainer_id, startup_prog, main_prog):
startup_program=startup_prog,
program=main_prog)
def prepare_for_multi_process(exe, build_strategy, train_prog, startup_prog):
def prepare_for_multi_process(exe, build_strategy, train_prog):
# prepare for multi-process
trainer_id = int(os.environ.get('PADDLE_TRAINER_ID', 0))
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
if num_trainers < 2: return
print("PADDLE_TRAINERS_NUM", num_trainers)
print("PADDLE_TRAINER_ID", trainer_id)
build_strategy.num_trainers = num_trainers
build_strategy.trainer_id = trainer_id
# NOTE(zcd): use multi processes to train the model,
# and each process use one GPU card.
if num_trainers > 1:
nccl2_prepare(trainer_id,
startup_prog, train_prog)
# the startup_prog are run two times, but it doesn't matter.
exe.run(startup_prog)
startup_prog = fluid.Program()
nccl2_prepare(trainer_id, startup_prog, train_prog)
# the startup_prog are run two times, but it doesn't matter.
exe.run(startup_prog)
......@@ -34,6 +34,7 @@ from utils.args import ArgumentGroup, print_arguments
from utils.init import init_pretraining_params, init_checkpoint
import dist_utils
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
# yapf: disable
parser = argparse.ArgumentParser(__doc__)
......@@ -110,35 +111,27 @@ def evaluate(exe, test_program, test_pyreader, fetch_list, eval_phase):
np.sum(total_acc) / np.sum(total_num_seqs), time_end - time_begin))
def get_device_num():
visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
# NOTE(zcd): use multi processes to train the model,
# and each process use one GPU card.
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
# NOTE(zcd): for multi-processe training, each process use one GPU card.
if num_trainers > 1 : return 1
visible_device = os.environ.get('CUDA_VISIBLE_DEVICES', None)
if visible_device:
device_num = len(visible_device.split(','))
else:
device_num = subprocess.check_output(['nvidia-smi','-L']).decode().count('\n')
return device_num
def update_lr(args):
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
args.learning_rate = args.learning_rate / num_trainers
def main(args):
bert_config = BertConfig(args.bert_config_path)
bert_config.print_config()
if args.use_cuda:
place = fluid.CUDAPlace(int(os.getenv('FLAGS_selected_gpus', '0')))
dev_count = get_device_num() # fluid.core.get_cuda_device_count()
dev_count = get_device_num()
else:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exe = fluid.Executor(place)
update_lr(args)
task_name = args.task_name.lower()
processors = {
'xnli': reader.XnliProcessor,
......@@ -166,6 +159,9 @@ def main(args):
train_program.random_seed = args.random_seed
if args.do_train:
# NOTE: do not shuffle dataset when using multi-process training.
if num_trainers > 1:
args.shuffle = False
train_data_generator = processor.data_generator(
batch_size=args.batch_size,
phase='train',
......@@ -271,7 +267,10 @@ def main(args):
exec_strategy.num_iteration_per_drop_scope = args.num_iteration_per_drop_scope
build_strategy = fluid.BuildStrategy()
dist_utils.prepare_for_multi_process(exe, build_strategy, train_program, startup_prog)
if args.use_cuda and num_trainers > 1:
dist_utils.prepare_for_multi_process(exe, build_strategy, train_program)
train_data_generator = fluid.contrib.reader.distributed_batch_reader(
train_data_generator)
train_exe = fluid.ParallelExecutor(
use_cuda=args.use_cuda,
......@@ -279,10 +278,6 @@ def main(args):
exec_strategy=exec_strategy,
build_strategy = build_strategy,
main_program=train_program)
# num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
# if num_trainers > 1:
# train_data_generator = fluid.contrib.reader.distributed_batch_reader(
# train_data_generator)
train_pyreader.decorate_tensor_provider(train_data_generator)
else:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册