未验证 提交 4553b9e1 编写于 作者: W wangchaochaohu 提交者: GitHub

code migration for multi process (mask rcnn model) (#2587)

* rcnn config

* code migration for mutiple process
上级 3d75911a
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import paddle.fluid as fluid
def nccl2_prepare(trainer_id, startup_prog, main_prog):
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
trainers=os.environ.get('PADDLE_TRAINER_ENDPOINTS'),
current_endpoint=os.environ.get('PADDLE_CURRENT_ENDPOINT'),
startup_program=startup_prog,
program=main_prog)
def prepare_for_multi_process(exe, build_strategy, train_prog):
# prepare for multi-process
trainer_id = int(os.environ.get('PADDLE_TRAINER_ID', 0))
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
if num_trainers < 2: return
print("PADDLE_TRAINERS_NUM", num_trainers)
print("PADDLE_TRAINER_ID", trainer_id)
build_strategy.num_trainers = num_trainers
build_strategy.trainer_id = trainer_id
# NOTE(zcd): use multi processes to train the model,
# and each process use one GPU card.
startup_prog = fluid.Program()
nccl2_prepare(trainer_id, startup_prog, train_prog)
# the startup_prog are run two times, but it doesn't matter.
exe.run(startup_prog)
...@@ -26,6 +26,7 @@ from roidbs import JsonDataset ...@@ -26,6 +26,7 @@ from roidbs import JsonDataset
import data_utils import data_utils
from config import cfg from config import cfg
import segm_utils import segm_utils
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
def roidb_reader(roidb, mode): def roidb_reader(roidb, mode):
...@@ -71,7 +72,8 @@ def coco(mode, ...@@ -71,7 +72,8 @@ def coco(mode,
batch_size=None, batch_size=None,
total_batch_size=None, total_batch_size=None,
padding_total=False, padding_total=False,
shuffle=False): shuffle=False,
shuffle_seed=None):
total_batch_size = total_batch_size if total_batch_size else batch_size total_batch_size = total_batch_size if total_batch_size else batch_size
assert total_batch_size % batch_size == 0 assert total_batch_size % batch_size == 0
json_dataset = JsonDataset(mode) json_dataset = JsonDataset(mode)
...@@ -97,6 +99,8 @@ def coco(mode, ...@@ -97,6 +99,8 @@ def coco(mode,
def reader(): def reader():
if mode == "train": if mode == "train":
if shuffle: if shuffle:
if shuffle_seed is not None:
np.random.seed(shuffle_seed)
roidb_perm = deque(np.random.permutation(roidbs)) roidb_perm = deque(np.random.permutation(roidbs))
else: else:
roidb_perm = deque(roidbs) roidb_perm = deque(roidbs)
...@@ -140,7 +144,7 @@ def coco(mode, ...@@ -140,7 +144,7 @@ def coco(mode,
sub_batch_out = [] sub_batch_out = []
batch_out = [] batch_out = []
iter_id = count // device_num iter_id = count // device_num
if iter_id >= cfg.max_iter: if iter_id >= cfg.max_iter * num_trainers:
return return
elif mode == "val": elif mode == "val":
batch_out = [] batch_out = []
...@@ -156,9 +160,18 @@ def coco(mode, ...@@ -156,9 +160,18 @@ def coco(mode,
return reader return reader
def train(batch_size, total_batch_size=None, padding_total=False, shuffle=True): def train(batch_size,
total_batch_size=None,
padding_total=False,
shuffle=True,
shuffle_seed=None):
return coco( return coco(
'train', batch_size, total_batch_size, padding_total, shuffle=shuffle) 'train',
batch_size,
total_batch_size,
padding_total,
shuffle=shuffle,
shuffle_seed=shuffle_seed)
def test(batch_size, total_batch_size=None, padding_total=False): def test(batch_size, total_batch_size=None, padding_total=False):
......
...@@ -17,11 +17,13 @@ from __future__ import division ...@@ -17,11 +17,13 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import os import os
def set_paddle_flags(flags): def set_paddle_flags(flags):
for key, value in flags.items(): for key, value in flags.items():
if os.environ.get(key, None) is None: if os.environ.get(key, None) is None:
os.environ[key] = str(value) os.environ[key] = str(value)
set_paddle_flags({ set_paddle_flags({
'FLAGS_conv_workspace_size_limit': 500, 'FLAGS_conv_workspace_size_limit': 500,
'FLAGS_eager_delete_tensor_gb': 0, # enable gc 'FLAGS_eager_delete_tensor_gb': 0, # enable gc
...@@ -43,6 +45,21 @@ import models.model_builder as model_builder ...@@ -43,6 +45,21 @@ import models.model_builder as model_builder
import models.resnet as resnet import models.resnet as resnet
from learning_rate import exponential_with_warmup_decay from learning_rate import exponential_with_warmup_decay
from config import cfg from config import cfg
import dist_utils
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
def get_device_num():
# NOTE(zcd): for multi-processe training, each process use one GPU card.
if num_trainers > 1: return 1
visible_device = os.environ.get('CUDA_VISIBLE_DEVICES', None)
if visible_device:
device_num = len(visible_device.split(','))
else:
device_num = subprocess.check_output(
['nvidia-smi', '-L']).decode().count('\n')
return device_num
def train(): def train():
...@@ -56,8 +73,7 @@ def train(): ...@@ -56,8 +73,7 @@ def train():
random.seed(0) random.seed(0)
np.random.seed(0) np.random.seed(0)
devices = os.getenv("CUDA_VISIBLE_DEVICES") or "" devices_num = get_device_num()
devices_num = len(devices.split(","))
total_batch_size = devices_num * cfg.TRAIN.im_per_batch total_batch_size = devices_num * cfg.TRAIN.im_per_batch
use_random = True use_random = True
...@@ -94,7 +110,9 @@ def train(): ...@@ -94,7 +110,9 @@ def train():
for var in fetch_list: for var in fetch_list:
var.persistable = True var.persistable = True
place = fluid.CUDAPlace(0) if cfg.use_gpu else fluid.CPUPlace() #fluid.memory_optimize(fluid.default_main_program(), skip_opt_set=set(fetch_list))
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
place = fluid.CUDAPlace(gpu_id) if cfg.use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
...@@ -109,11 +127,18 @@ def train(): ...@@ -109,11 +127,18 @@ def train():
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
build_strategy.memory_optimize = False build_strategy.memory_optimize = False
build_strategy.enable_inplace = True build_strategy.enable_inplace = True
exec_strategy = fluid.ExecutionStrategy() exec_strategy = fluid.ExecutionStrategy()
exec_strategy.use_experimental_executor = True
exec_strategy.num_iteration_per_drop_scope = 10 exec_strategy.num_iteration_per_drop_scope = 10
train_exe = fluid.ParallelExecutor(use_cuda=bool(cfg.use_gpu),
if num_trainers > 1 and cfg.use_gpu:
dist_utils.prepare_for_multi_process(exe, build_strategy,
fluid.default_main_program())
# NOTE: the process is fast when num_threads is 1
# for multi-process training.
exec_strategy.num_threads = 1
train_exe = fluid.ParallelExecutor(
use_cuda=bool(cfg.use_gpu),
loss_name=loss.name, loss_name=loss.name,
build_strategy=build_strategy, build_strategy=build_strategy,
exec_strategy=exec_strategy) exec_strategy=exec_strategy)
...@@ -123,15 +148,31 @@ def train(): ...@@ -123,15 +148,31 @@ def train():
shuffle = True shuffle = True
if cfg.enable_ce: if cfg.enable_ce:
shuffle = False shuffle = False
# NOTE: do not shuffle dataset when using multi-process training
shuffle_seed = None
if num_trainers > 1:
shuffle_seed = 1
if cfg.use_pyreader: if cfg.use_pyreader:
train_reader = reader.train( train_reader = reader.train(
batch_size=cfg.TRAIN.im_per_batch, batch_size=cfg.TRAIN.im_per_batch,
total_batch_size=total_batch_size, total_batch_size=total_batch_size,
padding_total=cfg.TRAIN.padding_minibatch, padding_total=cfg.TRAIN.padding_minibatch,
shuffle=shuffle) shuffle=shuffle,
shuffle_seed=shuffle_seed)
if num_trainers > 1:
assert shuffle_seed is not None, \
"If num_trainers > 1, the shuffle_seed must be set, because " \
"the order of batch data generated by reader " \
"must be the same in the respective processes."
# NOTE: the order of batch data generated by batch_reader
# must be the same in the respective processes.
if num_trainers > 1:
train_reader = fluid.contrib.reader.distributed_batch_reader(
train_reader)
py_reader = model.py_reader py_reader = model.py_reader
py_reader.decorate_paddle_reader(train_reader) py_reader.decorate_paddle_reader(train_reader)
else: else:
if num_trainers > 1: shuffle = False
train_reader = reader.train( train_reader = reader.train(
batch_size=total_batch_size, shuffle=shuffle) batch_size=total_batch_size, shuffle=shuffle)
feeder = fluid.DataFeeder(place=place, feed_list=model.feeds()) feeder = fluid.DataFeeder(place=place, feed_list=model.feeds())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册