未验证 提交 4d1a6f70 编写于 作者: Z zhaoyuchen2018 提交者: GitHub

Transfer resnext50 multi-process multi-gpu from benchmark to models (#2604)

* Transfer resnext50 multi-process multi-gpu changes from benchmark to models
Signed-off-by: Nzhaoyuchen <zhaoyuchen01@baidu.com>

* refine code
Signed-off-by: Nzhaoyuchen <zhaoyuchen01@baidu.com>
上级 59aef8d8
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
#See the License for the specific language governing permissions and #See the License for the specific language governing permissions and
#limitations under the License. #limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os import os
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -23,7 +26,8 @@ def nccl2_prepare(args, startup_prog, main_prog): ...@@ -23,7 +26,8 @@ def nccl2_prepare(args, startup_prog, main_prog):
envs = args.dist_env envs = args.dist_env
t.transpile(envs["trainer_id"], t.transpile(
envs["trainer_id"],
trainers=','.join(envs["trainer_endpoints"]), trainers=','.join(envs["trainer_endpoints"]),
current_endpoint=envs["current_endpoint"], current_endpoint=envs["current_endpoint"],
startup_program=startup_prog, startup_program=startup_prog,
...@@ -47,7 +51,9 @@ def pserver_prepare(args, train_prog, startup_prog): ...@@ -47,7 +51,9 @@ def pserver_prepare(args, train_prog, startup_prog):
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_program = t.get_pserver_program(envs["current_endpoint"]) pserver_program = t.get_pserver_program(envs["current_endpoint"])
pserver_startup_program = t.get_startup_program( pserver_startup_program = t.get_startup_program(
envs["current_endpoint"], pserver_program, startup_program=startup_prog) envs["current_endpoint"],
pserver_program,
startup_program=startup_prog)
return pserver_program, pserver_startup_program return pserver_program, pserver_startup_program
elif training_role == "TRAINER": elif training_role == "TRAINER":
train_program = t.get_trainer_program() train_program = t.get_trainer_program()
...@@ -56,3 +62,32 @@ def pserver_prepare(args, train_prog, startup_prog): ...@@ -56,3 +62,32 @@ def pserver_prepare(args, train_prog, startup_prog):
raise ValueError( raise ValueError(
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER' 'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
) )
def nccl2_prepare_paddle(trainer_id, startup_prog, main_prog):
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
trainers=os.environ.get('PADDLE_TRAINER_ENDPOINTS'),
current_endpoint=os.environ.get('PADDLE_CURRENT_ENDPOINT'),
startup_program=startup_prog,
program=main_prog)
def prepare_for_multi_process(exe, build_strategy, train_prog):
# prepare for multi-process
trainer_id = int(os.environ.get('PADDLE_TRAINER_ID', 0))
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
if num_trainers < 2: return
print("PADDLE_TRAINERS_NUM", num_trainers)
print("PADDLE_TRAINER_ID", trainer_id)
build_strategy.num_trainers = num_trainers
build_strategy.trainer_id = trainer_id
# NOTE(zcd): use multi processes to train the model,
# and each process use one GPU card.
startup_prog = fluid.Program()
nccl2_prepare_paddle(trainer_id, startup_prog, train_prog)
# the startup_prog are run two times, but it doesn't matter.
exe.run(startup_prog)
...@@ -140,75 +140,80 @@ def process_image(sample, mode, color_jitter, rotate): ...@@ -140,75 +140,80 @@ def process_image(sample, mode, color_jitter, rotate):
return [img] return [img]
def process_batch_data(input_data, mode, color_jitter, rotate):
batch_data = []
for sample in input_data:
batch_data.append(process_image(sample, mode, color_jitter, rotate))
return batch_data
def _reader_creator(file_list, def _reader_creator(file_list,
batch_size,
mode, mode,
shuffle=False, shuffle=False,
color_jitter=False, color_jitter=False,
rotate=False, rotate=False,
data_dir=DATA_DIR, data_dir=DATA_DIR,
pass_id_as_seed=1, shuffle_seed=0,
infinite=False): infinite=False):
def reader(): def reader():
with open(file_list) as flist: def read_file_list():
full_lines = [line.strip() for line in flist] with open(file_list) as flist:
pass_id_as_seed_counter = pass_id_as_seed full_lines = [line.strip() for line in flist]
while True:
if shuffle: if shuffle:
if pass_id_as_seed_counter: if shuffle_seed is not None:
np.random.seed(pass_id_as_seed_counter) np.random.seed(shuffle_seed)
np.random.shuffle(full_lines) np.random.shuffle(full_lines)
if mode == 'train' and os.getenv('PADDLE_TRAINING_ROLE'): batch_data = []
# distributed mode if the env var `PADDLE_TRAINING_ROLE` exits for line in full_lines:
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) img_path, label = line.split()
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) img_path = os.path.join(data_dir, img_path)
per_node_lines = len(full_lines) // trainer_count batch_data.append([img_path, int(label)])
lines = full_lines[trainer_id * per_node_lines:( if len(batch_data) == batch_size:
trainer_id + 1) * per_node_lines]
print(
"read images from %d, length: %d, lines length: %d, total: %d"
% (trainer_id * per_node_lines, per_node_lines,
len(lines), len(full_lines)))
else:
lines = full_lines
for line in lines:
if mode == 'train' or mode == 'val': if mode == 'train' or mode == 'val':
img_path, label = line.split() yield batch_data
img_path = os.path.join(data_dir, img_path)
yield img_path, int(label)
elif mode == 'test': elif mode == 'test':
img_path, label = line.split() yield [sample[0] for sample in batch_data]
img_path = os.path.join(data_dir, img_path) batch_data = []
yield [img_path]
if not infinite: return read_file_list
break
pass_id_as_seed_counter += 1 data_reader = reader()
print("passid ++, current: ", pass_id_as_seed_counter) num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
if mode == 'train' and num_trainers > 1:
assert shuffle_seed is not None, \
"If num_trainers > 1, the shuffle_seed must be set, because " \
"the order of batch data generated by reader " \
"must be the same in the respective processes."
data_reader = fluid.contrib.reader.distributed_batch_reader(data_reader)
mapper = functools.partial( mapper = functools.partial(
process_image, mode=mode, color_jitter=color_jitter, rotate=rotate) process_batch_data, mode=mode, color_jitter=color_jitter, rotate=rotate)
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE) return paddle.reader.xmap_readers(mapper, data_reader, THREAD, BUF_SIZE)
def train(data_dir=DATA_DIR, pass_id_as_seed=1, infinite=False): def train(batch_size, data_dir=DATA_DIR, shuffle_seed=0, infinite=False):
file_list = os.path.join(data_dir, 'train_list.txt') file_list = os.path.join(data_dir, 'train_list.txt')
return _reader_creator( return _reader_creator(
file_list, file_list,
batch_size,
'train', 'train',
shuffle=True, shuffle=True,
color_jitter=False, color_jitter=False,
rotate=False, rotate=False,
data_dir=data_dir, data_dir=data_dir,
pass_id_as_seed=pass_id_as_seed, shuffle_seed=shuffle_seed,
infinite=infinite) infinite=infinite)
def val(data_dir=DATA_DIR): def val(batch_size, data_dir=DATA_DIR):
file_list = os.path.join(data_dir, 'val_list.txt') file_list = os.path.join(data_dir, 'val_list.txt')
return _reader_creator(file_list, 'val', shuffle=False, data_dir=data_dir) return _reader_creator(
file_list, batch_size, 'val', shuffle=False, data_dir=data_dir)
def test(data_dir=DATA_DIR): def test(batch_size, data_dir=DATA_DIR):
file_list = os.path.join(data_dir, 'val_list.txt') file_list = os.path.join(data_dir, 'val_list.txt')
return _reader_creator(file_list, 'test', shuffle=False, data_dir=data_dir) return _reader_creator(
file_list, batch_size, 'test', shuffle=False, data_dir=data_dir)
...@@ -21,6 +21,7 @@ import cv2 ...@@ -21,6 +21,7 @@ import cv2
import io import io
import paddle import paddle
import paddle.fluid as fluid
random.seed(0) random.seed(0)
np.random.seed(0) np.random.seed(0)
...@@ -150,7 +151,8 @@ def create_mixup_reader(settings, rd): ...@@ -150,7 +151,8 @@ def create_mixup_reader(settings, rd):
def mixup_reader(): def mixup_reader():
for context.tmp_mix, context.tmp_l1, context.tmp_l2, context.tmp_lam in mixup_data(): for context.tmp_mix, context.tmp_l1, context.tmp_l2, context.tmp_lam in mixup_data(
):
for i in range(len(context.tmp_mix)): for i in range(len(context.tmp_mix)):
mixed_l = context.tmp_mix[i] mixed_l = context.tmp_mix[i]
l1 = context.tmp_l1[i] l1 = context.tmp_l1[i]
...@@ -210,82 +212,99 @@ def image_mapper(**kwargs): ...@@ -210,82 +212,99 @@ def image_mapper(**kwargs):
return functools.partial(process_image, **kwargs) return functools.partial(process_image, **kwargs)
def process_batch_data(input_data, settings, mode, color_jitter, rotate):
batch_data = []
for sample in input_data:
batch_data.append(
process_image(sample, settings, mode, color_jitter, rotate))
return batch_data
def _reader_creator(settings, def _reader_creator(settings,
file_list, file_list,
batch_size,
mode, mode,
shuffle=False, shuffle=False,
color_jitter=False, color_jitter=False,
rotate=False, rotate=False,
data_dir=DATA_DIR, data_dir=DATA_DIR,
pass_id_as_seed=0): shuffle_seed=0):
def reader(): def reader():
with open(file_list) as flist: def read_file_list():
full_lines = [line.strip() for line in flist] with open(file_list) as flist:
if shuffle: full_lines = [line.strip() for line in flist]
if pass_id_as_seed: if shuffle:
np.random.seed(pass_id_as_seed) if shuffle_seed is not None:
np.random.shuffle(full_lines) np.random.seed(shuffle_seed)
if mode == 'train' and os.getenv('PADDLE_TRAINING_ROLE'): np.random.shuffle(full_lines)
# distributed mode if the env var `PADDLE_TRAINING_ROLE` exits batch_data = []
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) for line in full_lines:
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) img_path, label = line.split()
per_node_lines = len(full_lines) // trainer_count img_path = os.path.join(data_dir, img_path)
lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) batch_data.append([img_path, int(label)])
* per_node_lines] if len(batch_data) == batch_size:
print( if mode == 'train' or mode == 'val':
"read images from %d, length: %d, lines length: %d, total: %d" yield batch_data
% (trainer_id * per_node_lines, per_node_lines, len(lines), elif mode == 'test':
len(full_lines))) yield [sample[0] for sample in batch_data]
else: batch_data = []
lines = full_lines
return read_file_list
for line in lines:
if mode == 'train' or mode == 'val': data_reader = reader()
img_path, label = line.split() num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
img_path = os.path.join(data_dir, img_path) if mode == 'train' and num_trainers > 1:
yield img_path, int(label) assert shuffle_seed is not None, \
elif mode == 'test': "If num_trainers > 1, the shuffle_seed must be set, because " \
img_path, label = line.split() "the order of batch data generated by reader " \
img_path = os.path.join(data_dir, img_path) "must be the same in the respective processes."
data_reader = fluid.contrib.reader.distributed_batch_reader(data_reader)
yield [img_path]
mapper = functools.partial(
crop_size = int(settings.image_shape.split(",")[2]) process_batch_data,
image_mapper = functools.partial(
process_image,
settings=settings, settings=settings,
mode=mode, mode=mode,
color_jitter=color_jitter, color_jitter=color_jitter,
rotate=rotate, rotate=rotate)
crop_size=crop_size)
reader = paddle.reader.xmap_readers( return paddle.reader.xmap_readers(
image_mapper, reader, THREAD, BUF_SIZE, order=False) mapper, data_reader, THREAD, BUF_SIZE, order=False)
return reader
def train(settings, data_dir=DATA_DIR, pass_id_as_seed=0): def train(settings, batch_size, data_dir=DATA_DIR, shuffle_seed=0):
file_list = os.path.join(data_dir, 'train_list.txt') file_list = os.path.join(data_dir, 'train_list.txt')
reader = _reader_creator( reader = _reader_creator(
settings, settings,
file_list, file_list,
batch_size,
'train', 'train',
shuffle=True, shuffle=True,
color_jitter=False, color_jitter=False,
rotate=False, rotate=False,
data_dir=data_dir, data_dir=data_dir,
pass_id_as_seed=pass_id_as_seed, ) shuffle_seed=shuffle_seed)
if settings.use_mixup == True: if settings.use_mixup == True:
reader = create_mixup_reader(settings, reader) reader = create_mixup_reader(settings, reader)
return reader return reader
def val(settings, data_dir=DATA_DIR): def val(settings, batch_size, data_dir=DATA_DIR):
file_list = os.path.join(data_dir, 'val_list.txt') file_list = os.path.join(data_dir, 'val_list.txt')
return _reader_creator( return _reader_creator(
settings, file_list, 'val', shuffle=False, data_dir=data_dir) settings,
file_list,
batch_size,
'val',
shuffle=False,
data_dir=data_dir)
def test(settings, data_dir=DATA_DIR): def test(settings, batch_size, data_dir=DATA_DIR):
file_list = os.path.join(data_dir, 'val_list.txt') file_list = os.path.join(data_dir, 'val_list.txt')
return _reader_creator( return _reader_creator(
settings, file_list, 'test', shuffle=False, data_dir=data_dir) settings,
file_list,
batch_size,
'test',
shuffle=False,
data_dir=data_dir)
...@@ -49,8 +49,10 @@ import models ...@@ -49,8 +49,10 @@ import models
from utils.fp16_utils import create_master_params_grads, master_param_to_train_param from utils.fp16_utils import create_master_params_grads, master_param_to_train_param
from utils.utility import add_arguments, print_arguments from utils.utility import add_arguments, print_arguments
from utils.learning_rate import cosine_decay_with_warmup from utils.learning_rate import cosine_decay_with_warmup
from dist_train import dist_utils
IMAGENET1000 = 1281167 IMAGENET1000 = 1281167
num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser) add_arg = functools.partial(add_arguments, argparser=parser)
...@@ -335,7 +337,9 @@ def build_program(is_train, main_prog, startup_prog, args): ...@@ -335,7 +337,9 @@ def build_program(is_train, main_prog, startup_prog, args):
return build_program_out return build_program_out
def get_device_num(): def get_device_num():
visible_device = os.getenv('CUDA_VISIBLE_DEVICES') # NOTE(zcd): for multi-processe training, each process use one GPU card.
if num_trainers > 1 : return 1
visible_device = os.environ.get('CUDA_VISIBLE_DEVICES', None)
if visible_device: if visible_device:
device_num = len(visible_device.split(',')) device_num = len(visible_device.split(','))
else: else:
...@@ -365,11 +369,19 @@ def train(args): ...@@ -365,11 +369,19 @@ def train(args):
args=args) args=args)
if use_mixup: if use_mixup:
train_py_reader, train_cost, global_lr = b_out[0], b_out[1], b_out[2] train_py_reader, train_cost, global_lr = b_out[0], b_out[1], b_out[2]
train_fetch_list = [train_cost.name, global_lr.name] train_fetch_vars = [train_cost, global_lr]
train_fetch_list = []
for var in train_fetch_vars:
var.persistable=True
train_fetch_list.append(var.name)
else: else:
train_py_reader, train_cost, train_acc1, train_acc5, global_lr = b_out[0],b_out[1],b_out[2],b_out[3],b_out[4] train_py_reader, train_cost, train_acc1, train_acc5, global_lr = b_out[0],b_out[1],b_out[2],b_out[3],b_out[4]
train_fetch_list = [train_cost.name, train_acc1.name, train_acc5.name, global_lr.name] train_fetch_vars = [train_cost, train_acc1, train_acc5, global_lr]
train_fetch_list = []
for var in train_fetch_vars:
var.persistable=True
train_fetch_list.append(var.name)
b_out_test = build_program( b_out_test = build_program(
is_train=False, is_train=False,
...@@ -383,7 +395,8 @@ def train(args): ...@@ -383,7 +395,8 @@ def train(args):
fluid.memory_optimize(train_prog) fluid.memory_optimize(train_prog)
fluid.memory_optimize(test_prog) fluid.memory_optimize(test_prog)
place = fluid.CUDAPlace(0) if args.use_gpu else fluid.CPUPlace() gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
place = fluid.CUDAPlace(gpu_id) if args.use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_prog) exe.run(startup_prog)
...@@ -406,9 +419,11 @@ def train(args): ...@@ -406,9 +419,11 @@ def train(args):
test_batch_size = 16 test_batch_size = 16
if not args.enable_ce: if not args.enable_ce:
train_reader = paddle.batch( # NOTE: the order of batch data generated by batch_reader
reader.train(settings=args), batch_size=train_batch_size, drop_last=True) # must be the same in the respective processes.
test_reader = paddle.batch(reader.val(settings=args), batch_size=test_batch_size) shuffle_seed = 1 if num_trainers > 1 else None
train_reader = reader.train(settings=args, batch_size=train_batch_size, shuffle_seed=shuffle_seed)
test_reader = reader.val(settings=args, batch_size=test_batch_size)
else: else:
# use flowers dataset for CE and set use_xmap False to avoid disorder data # use flowers dataset for CE and set use_xmap False to avoid disorder data
# but it is time consuming. For faster speed, need another dataset. # but it is time consuming. For faster speed, need another dataset.
...@@ -419,12 +434,21 @@ def train(args): ...@@ -419,12 +434,21 @@ def train(args):
flowers.train(use_xmap=False), flowers.train(use_xmap=False),
batch_size=train_batch_size, batch_size=train_batch_size,
drop_last=True) drop_last=True)
if num_trainers > 1:
train_reader = fluid.contrib.reader.distributed_batch_reader(train_reader)
test_reader = paddle.batch( test_reader = paddle.batch(
flowers.test(use_xmap=False), batch_size=test_batch_size) flowers.test(use_xmap=False), batch_size=test_batch_size)
train_py_reader.decorate_paddle_reader(train_reader) train_py_reader.decorate_paddle_reader(train_reader)
test_py_reader.decorate_paddle_reader(test_reader) test_py_reader.decorate_paddle_reader(test_reader)
test_fetch_vars = [test_cost, test_acc1, test_acc5]
test_fetch_list = []
for var in test_fetch_vars:
var.persistable=True
test_fetch_list.append(var.name)
# use_ngraph is for CPU only, please refer to README_ngraph.md for details # use_ngraph is for CPU only, please refer to README_ngraph.md for details
use_ngraph = os.getenv('FLAGS_use_ngraph') use_ngraph = os.getenv('FLAGS_use_ngraph')
if not use_ngraph: if not use_ngraph:
...@@ -435,7 +459,13 @@ def train(args): ...@@ -435,7 +459,13 @@ def train(args):
#build_strategy.fuse_all_reduce_ops=1 #build_strategy.fuse_all_reduce_ops=1
exec_strategy = fluid.ExecutionStrategy() exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = device_num
exec_strategy.num_iteration_per_drop_scope = 10 exec_strategy.num_iteration_per_drop_scope = 10
if num_trainers > 1 and args.use_gpu:
dist_utils.prepare_for_multi_process(exe, build_strategy, train_prog)
# NOTE: the process is fast when num_threads is 1
# for multi-process training.
exec_strategy.num_threads = 1
train_exe = fluid.ParallelExecutor( train_exe = fluid.ParallelExecutor(
main_program=train_prog, main_program=train_prog,
...@@ -446,8 +476,6 @@ def train(args): ...@@ -446,8 +476,6 @@ def train(args):
else: else:
train_exe = exe train_exe = exe
test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
params = models.__dict__[args.model]().params params = models.__dict__[args.model]().params
for pass_id in range(params["num_epochs"]): for pass_id in range(params["num_epochs"]):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册