diff --git a/PaddleCV/rcnn/dist_utils.py b/PaddleCV/rcnn/dist_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..49df856d950a689951a6e070b1c1810be196f758 --- /dev/null +++ b/PaddleCV/rcnn/dist_utils.py @@ -0,0 +1,48 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve. +# +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import os +import paddle.fluid as fluid + + +def nccl2_prepare(trainer_id, startup_prog, main_prog): + config = fluid.DistributeTranspilerConfig() + config.mode = "nccl2" + t = fluid.DistributeTranspiler(config=config) + t.transpile( + trainer_id, + trainers=os.environ.get('PADDLE_TRAINER_ENDPOINTS'), + current_endpoint=os.environ.get('PADDLE_CURRENT_ENDPOINT'), + startup_program=startup_prog, + program=main_prog) + + +def prepare_for_multi_process(exe, build_strategy, train_prog): + # prepare for multi-process + trainer_id = int(os.environ.get('PADDLE_TRAINER_ID', 0)) + num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) + if num_trainers < 2: return + print("PADDLE_TRAINERS_NUM", num_trainers) + print("PADDLE_TRAINER_ID", trainer_id) + build_strategy.num_trainers = num_trainers + build_strategy.trainer_id = trainer_id + # NOTE(zcd): use multi processes to train the model, + # and each process use one GPU card. + startup_prog = fluid.Program() + nccl2_prepare(trainer_id, startup_prog, train_prog) + # the startup_prog are run two times, but it doesn't matter. + exe.run(startup_prog) diff --git a/PaddleCV/rcnn/reader.py b/PaddleCV/rcnn/reader.py index d2d5abbca8f0175fb1d1ba50ffcd62b2b5ed31da..7dded0ab4444e67d6251d4eb99622af85a325e8f 100644 --- a/PaddleCV/rcnn/reader.py +++ b/PaddleCV/rcnn/reader.py @@ -26,6 +26,7 @@ from roidbs import JsonDataset import data_utils from config import cfg import segm_utils +num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) def roidb_reader(roidb, mode): @@ -71,7 +72,8 @@ def coco(mode, batch_size=None, total_batch_size=None, padding_total=False, - shuffle=False): + shuffle=False, + shuffle_seed=None): total_batch_size = total_batch_size if total_batch_size else batch_size assert total_batch_size % batch_size == 0 json_dataset = JsonDataset(mode) @@ -97,6 +99,8 @@ def coco(mode, def reader(): if mode == "train": if shuffle: + if shuffle_seed is not None: + np.random.seed(shuffle_seed) roidb_perm = deque(np.random.permutation(roidbs)) else: roidb_perm = deque(roidbs) @@ -140,7 +144,7 @@ def coco(mode, sub_batch_out = [] batch_out = [] iter_id = count // device_num - if iter_id >= cfg.max_iter: + if iter_id >= cfg.max_iter * num_trainers: return elif mode == "val": batch_out = [] @@ -156,9 +160,18 @@ def coco(mode, return reader -def train(batch_size, total_batch_size=None, padding_total=False, shuffle=True): +def train(batch_size, + total_batch_size=None, + padding_total=False, + shuffle=True, + shuffle_seed=None): return coco( - 'train', batch_size, total_batch_size, padding_total, shuffle=shuffle) + 'train', + batch_size, + total_batch_size, + padding_total, + shuffle=shuffle, + shuffle_seed=shuffle_seed) def test(batch_size, total_batch_size=None, padding_total=False): diff --git a/PaddleCV/rcnn/train.py b/PaddleCV/rcnn/train.py index e5755bdd457147cad80b62332d72819e440d48f3..92e07f1aebf7da377faa8afac4b50b3cccfb9ac6 100644 --- a/PaddleCV/rcnn/train.py +++ b/PaddleCV/rcnn/train.py @@ -17,18 +17,20 @@ from __future__ import division from __future__ import print_function import os + def set_paddle_flags(flags): for key, value in flags.items(): if os.environ.get(key, None) is None: os.environ[key] = str(value) + set_paddle_flags({ 'FLAGS_conv_workspace_size_limit': 500, - 'FLAGS_eager_delete_tensor_gb': 0, # enable gc + 'FLAGS_eager_delete_tensor_gb': 0, # enable gc 'FLAGS_memory_fraction_of_eager_deletion': 1, 'FLAGS_fraction_of_gpu_memory_to_use': 0.98 }) - + import sys import numpy as np import time @@ -43,6 +45,21 @@ import models.model_builder as model_builder import models.resnet as resnet from learning_rate import exponential_with_warmup_decay from config import cfg +import dist_utils + +num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) + + +def get_device_num(): + # NOTE(zcd): for multi-processe training, each process use one GPU card. + if num_trainers > 1: return 1 + visible_device = os.environ.get('CUDA_VISIBLE_DEVICES', None) + if visible_device: + device_num = len(visible_device.split(',')) + else: + device_num = subprocess.check_output( + ['nvidia-smi', '-L']).decode().count('\n') + return device_num def train(): @@ -56,8 +73,7 @@ def train(): random.seed(0) np.random.seed(0) - devices = os.getenv("CUDA_VISIBLE_DEVICES") or "" - devices_num = len(devices.split(",")) + devices_num = get_device_num() total_batch_size = devices_num * cfg.TRAIN.im_per_batch use_random = True @@ -94,7 +110,9 @@ def train(): for var in fetch_list: var.persistable = True - place = fluid.CUDAPlace(0) if cfg.use_gpu else fluid.CPUPlace() + #fluid.memory_optimize(fluid.default_main_program(), skip_opt_set=set(fetch_list)) + gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) + place = fluid.CUDAPlace(gpu_id) if cfg.use_gpu else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) @@ -109,29 +127,52 @@ def train(): build_strategy = fluid.BuildStrategy() build_strategy.memory_optimize = False build_strategy.enable_inplace = True - exec_strategy = fluid.ExecutionStrategy() - exec_strategy.use_experimental_executor = True exec_strategy.num_iteration_per_drop_scope = 10 - train_exe = fluid.ParallelExecutor(use_cuda=bool(cfg.use_gpu), - loss_name=loss.name, - build_strategy=build_strategy, - exec_strategy=exec_strategy) + + if num_trainers > 1 and cfg.use_gpu: + dist_utils.prepare_for_multi_process(exe, build_strategy, + fluid.default_main_program()) + # NOTE: the process is fast when num_threads is 1 + # for multi-process training. + exec_strategy.num_threads = 1 + + train_exe = fluid.ParallelExecutor( + use_cuda=bool(cfg.use_gpu), + loss_name=loss.name, + build_strategy=build_strategy, + exec_strategy=exec_strategy) else: train_exe = exe shuffle = True if cfg.enable_ce: shuffle = False + # NOTE: do not shuffle dataset when using multi-process training + shuffle_seed = None + if num_trainers > 1: + shuffle_seed = 1 if cfg.use_pyreader: train_reader = reader.train( batch_size=cfg.TRAIN.im_per_batch, total_batch_size=total_batch_size, padding_total=cfg.TRAIN.padding_minibatch, - shuffle=shuffle) + shuffle=shuffle, + shuffle_seed=shuffle_seed) + if num_trainers > 1: + assert shuffle_seed is not None, \ + "If num_trainers > 1, the shuffle_seed must be set, because " \ + "the order of batch data generated by reader " \ + "must be the same in the respective processes." + # NOTE: the order of batch data generated by batch_reader + # must be the same in the respective processes. + if num_trainers > 1: + train_reader = fluid.contrib.reader.distributed_batch_reader( + train_reader) py_reader = model.py_reader py_reader.decorate_paddle_reader(train_reader) else: + if num_trainers > 1: shuffle = False train_reader = reader.train( batch_size=total_batch_size, shuffle=shuffle) feeder = fluid.DataFeeder(place=place, feed_list=model.feeds())