diff --git a/PaddleCV/image_classification/dist_train/dist_train.py b/PaddleCV/image_classification/dist_train/dist_train.py index 4946434e1a4eb3b0219ae7d3b284610063646e02..dfa4237af4bf0eb5c4034143e90c12ab994365f5 100644 --- a/PaddleCV/image_classification/dist_train/dist_train.py +++ b/PaddleCV/image_classification/dist_train/dist_train.py @@ -86,8 +86,12 @@ def get_device_num(): return device_num def prepare_reader(is_train, pyreader, args, pass_id=0): + # NOTE: allways set reader infinite when nccl2 mode to balance data + # between ranks + is_infinite = (args.update_method == "nccl2") if is_train: - reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id) + reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id, + infinite=is_infinite) else: reader = val(data_dir=args.data_dir) if is_train: @@ -138,7 +142,10 @@ def build_program(is_train, main_prog, startup_prog, args): end_lr /= device_num_per_worker total_images = args.total_images / trainer_count - step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1) + if os.getenv("FLAGS_selected_gpus"): + step = int(total_images / (args.batch_size / device_num_per_worker * args.multi_batch_repeat) + 1) + else: + step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1) warmup_steps = step * 5 # warmup 5 passes epochs = [30, 60, 80] bd = [step * e for e in epochs] @@ -262,9 +269,9 @@ def train_parallel(args): strategy = fluid.ExecutionStrategy() strategy.num_threads = args.num_threads # num_iteration_per_drop_scope indicates how - # many iterations to clean up the temp variables which - # is generated during execution. It may make the execution faster, - # because the temp variable's shape maybe the same between two iterations + # many iterations to clean up the temp variables which + # is generated during execution. It may make the execution faster, + # because the temp variable's shape are the same between two iterations. strategy.num_iteration_per_drop_scope = 30 build_strategy = fluid.BuildStrategy() @@ -317,7 +324,13 @@ def train_parallel(args): over_all_start = time.time() fetch_list = [train_cost.name, train_acc1.name, train_acc5.name] - steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"] + # 1. MP mode, batch size for current process should be args.batch_size / GPUs + # 2. SP/PG mode, batch size for each process should be original args.batch_size + if os.getenv("FLAGS_selected_gpus"): + steps_per_pass = args.total_images / (args.batch_size / get_device_num()) / args.dist_env["num_trainers"] + else: + steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"] + for pass_id in range(args.num_epochs): num_samples = 0 start_time = time.time() @@ -342,7 +355,7 @@ def train_parallel(args): break num_samples += args.batch_size batch_id += 1 - if args.skip_unbalanced_data and batch_id >= steps_per_pass: + if (args.skip_unbalanced_data or args.update_method == "nccl2") and batch_id >= steps_per_pass: break print_train_time(start_time, time.time(), num_samples) diff --git a/PaddleCV/image_classification/reader.py b/PaddleCV/image_classification/reader.py index d8aa9da49b9e0caf28f72261965814c5cdca914d..b55c75f9fe12767bfcec958756f26528a1609570 100644 --- a/PaddleCV/image_classification/reader.py +++ b/PaddleCV/image_classification/reader.py @@ -131,38 +131,43 @@ def _reader_creator(file_list, color_jitter=False, rotate=False, data_dir=DATA_DIR, - pass_id_as_seed=0): + pass_id_as_seed=0, + infinite=False): def reader(): with open(file_list) as flist: full_lines = [line.strip() for line in flist] - if shuffle: - if pass_id_as_seed: - np.random.seed(pass_id_as_seed) - np.random.shuffle(full_lines) - if mode == 'train' and os.getenv('PADDLE_TRAINING_ROLE'): - # distributed mode if the env var `PADDLE_TRAINING_ROLE` exits - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) - per_node_lines = len(full_lines) // trainer_count - lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) - * per_node_lines] - print( - "read images from %d, length: %d, lines length: %d, total: %d" - % (trainer_id * per_node_lines, per_node_lines, len(lines), - len(full_lines))) - else: - lines = full_lines - - for line in lines: - if mode == 'train' or mode == 'val': - img_path, label = line.split() - img_path = os.path.join(data_dir, img_path) - yield img_path, int(label) - elif mode == 'test': - img_path, label = line.split() - img_path = os.path.join(data_dir, img_path) - - yield [img_path] + while True: + if shuffle: + if pass_id_as_seed: + np.random.seed(pass_id_as_seed) + np.random.shuffle(full_lines) + if mode == 'train' and os.getenv('PADDLE_TRAINING_ROLE'): + # distributed mode if the env var `PADDLE_TRAINING_ROLE` exits + trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + per_node_lines = len(full_lines) // trainer_count + lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) + * per_node_lines] + print( + "read images from %d, length: %d, lines length: %d, total: %d" + % (trainer_id * per_node_lines, per_node_lines, len(lines), + len(full_lines))) + else: + lines = full_lines + + for line in lines: + if mode == 'train' or mode == 'val': + img_path, label = line.split() + img_path = os.path.join(data_dir, img_path) + yield img_path, int(label) + elif mode == 'test': + img_path, label = line.split() + img_path = os.path.join(data_dir, img_path) + yield [img_path] + if not infinite: + break + pass_id_as_seed += 1 + print("passid ++, current: ", pass_id_as_seed) mapper = functools.partial( process_image, mode=mode, color_jitter=color_jitter, rotate=rotate) @@ -170,7 +175,7 @@ def _reader_creator(file_list, return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE) -def train(data_dir=DATA_DIR, pass_id_as_seed=0): +def train(data_dir=DATA_DIR, pass_id_as_seed=0, infinite=False): file_list = os.path.join(data_dir, 'train_list.txt') return _reader_creator( file_list,