提交 9e23a870 编写于 作者: T typhoonzero

commit for test

上级 ad9d219a
...@@ -86,8 +86,12 @@ def get_device_num(): ...@@ -86,8 +86,12 @@ def get_device_num():
return device_num return device_num
def prepare_reader(is_train, pyreader, args, pass_id=0): def prepare_reader(is_train, pyreader, args, pass_id=0):
# NOTE: allways set reader infinite when nccl2 mode to balance data
# between ranks
is_infinite = (args.update_method == "nccl2")
if is_train: if is_train:
reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id) reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id,
infinite=is_infinite)
else: else:
reader = val(data_dir=args.data_dir) reader = val(data_dir=args.data_dir)
if is_train: if is_train:
...@@ -138,7 +142,10 @@ def build_program(is_train, main_prog, startup_prog, args): ...@@ -138,7 +142,10 @@ def build_program(is_train, main_prog, startup_prog, args):
end_lr /= device_num_per_worker end_lr /= device_num_per_worker
total_images = args.total_images / trainer_count total_images = args.total_images / trainer_count
step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1) if os.getenv("FLAGS_selected_gpus"):
step = int(total_images / (args.batch_size / device_num_per_worker * args.multi_batch_repeat) + 1)
else:
step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
warmup_steps = step * 5 # warmup 5 passes warmup_steps = step * 5 # warmup 5 passes
epochs = [30, 60, 80] epochs = [30, 60, 80]
bd = [step * e for e in epochs] bd = [step * e for e in epochs]
...@@ -262,9 +269,9 @@ def train_parallel(args): ...@@ -262,9 +269,9 @@ def train_parallel(args):
strategy = fluid.ExecutionStrategy() strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.num_threads strategy.num_threads = args.num_threads
# num_iteration_per_drop_scope indicates how # num_iteration_per_drop_scope indicates how
# many iterations to clean up the temp variables which # many iterations to clean up the temp variables which
# is generated during execution. It may make the execution faster, # is generated during execution. It may make the execution faster,
# because the temp variable's shape maybe the same between two iterations # because the temp variable's shape are the same between two iterations.
strategy.num_iteration_per_drop_scope = 30 strategy.num_iteration_per_drop_scope = 30
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
...@@ -317,7 +324,13 @@ def train_parallel(args): ...@@ -317,7 +324,13 @@ def train_parallel(args):
over_all_start = time.time() over_all_start = time.time()
fetch_list = [train_cost.name, train_acc1.name, train_acc5.name] fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"] # 1. MP mode, batch size for current process should be args.batch_size / GPUs
# 2. SP/PG mode, batch size for each process should be original args.batch_size
if os.getenv("FLAGS_selected_gpus"):
steps_per_pass = args.total_images / (args.batch_size / get_device_num()) / args.dist_env["num_trainers"]
else:
steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"]
for pass_id in range(args.num_epochs): for pass_id in range(args.num_epochs):
num_samples = 0 num_samples = 0
start_time = time.time() start_time = time.time()
...@@ -342,7 +355,7 @@ def train_parallel(args): ...@@ -342,7 +355,7 @@ def train_parallel(args):
break break
num_samples += args.batch_size num_samples += args.batch_size
batch_id += 1 batch_id += 1
if args.skip_unbalanced_data and batch_id >= steps_per_pass: if (args.skip_unbalanced_data or args.update_method == "nccl2") and batch_id >= steps_per_pass:
break break
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
......
...@@ -131,38 +131,43 @@ def _reader_creator(file_list, ...@@ -131,38 +131,43 @@ def _reader_creator(file_list,
color_jitter=False, color_jitter=False,
rotate=False, rotate=False,
data_dir=DATA_DIR, data_dir=DATA_DIR,
pass_id_as_seed=0): pass_id_as_seed=0,
infinite=False):
def reader(): def reader():
with open(file_list) as flist: with open(file_list) as flist:
full_lines = [line.strip() for line in flist] full_lines = [line.strip() for line in flist]
if shuffle: while True:
if pass_id_as_seed: if shuffle:
np.random.seed(pass_id_as_seed) if pass_id_as_seed:
np.random.shuffle(full_lines) np.random.seed(pass_id_as_seed)
if mode == 'train' and os.getenv('PADDLE_TRAINING_ROLE'): np.random.shuffle(full_lines)
# distributed mode if the env var `PADDLE_TRAINING_ROLE` exits if mode == 'train' and os.getenv('PADDLE_TRAINING_ROLE'):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) # distributed mode if the env var `PADDLE_TRAINING_ROLE` exits
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
per_node_lines = len(full_lines) // trainer_count trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) per_node_lines = len(full_lines) // trainer_count
* per_node_lines] lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1)
print( * per_node_lines]
"read images from %d, length: %d, lines length: %d, total: %d" print(
% (trainer_id * per_node_lines, per_node_lines, len(lines), "read images from %d, length: %d, lines length: %d, total: %d"
len(full_lines))) % (trainer_id * per_node_lines, per_node_lines, len(lines),
else: len(full_lines)))
lines = full_lines else:
lines = full_lines
for line in lines:
if mode == 'train' or mode == 'val': for line in lines:
img_path, label = line.split() if mode == 'train' or mode == 'val':
img_path = os.path.join(data_dir, img_path) img_path, label = line.split()
yield img_path, int(label) img_path = os.path.join(data_dir, img_path)
elif mode == 'test': yield img_path, int(label)
img_path, label = line.split() elif mode == 'test':
img_path = os.path.join(data_dir, img_path) img_path, label = line.split()
img_path = os.path.join(data_dir, img_path)
yield [img_path] yield [img_path]
if not infinite:
break
pass_id_as_seed += 1
print("passid ++, current: ", pass_id_as_seed)
mapper = functools.partial( mapper = functools.partial(
process_image, mode=mode, color_jitter=color_jitter, rotate=rotate) process_image, mode=mode, color_jitter=color_jitter, rotate=rotate)
...@@ -170,7 +175,7 @@ def _reader_creator(file_list, ...@@ -170,7 +175,7 @@ def _reader_creator(file_list,
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE) return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
def train(data_dir=DATA_DIR, pass_id_as_seed=0): def train(data_dir=DATA_DIR, pass_id_as_seed=0, infinite=False):
file_list = os.path.join(data_dir, 'train_list.txt') file_list = os.path.join(data_dir, 'train_list.txt')
return _reader_creator( return _reader_creator(
file_list, file_list,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册