From 3984c24bd9ead8234cf71d6586ed57addfa3f250 Mon Sep 17 00:00:00 2001 From: Bai Yifan Date: Sun, 2 Sep 2018 16:11:51 +0800 Subject: [PATCH] Revert reader (#1216) * revert reader --- fluid/object_detection/data_util.py | 151 ------------------ fluid/object_detection/reader.py | 79 ++-------- fluid/object_detection/train.py | 233 ++++++++++++---------------- 3 files changed, 112 insertions(+), 351 deletions(-) delete mode 100644 fluid/object_detection/data_util.py diff --git a/fluid/object_detection/data_util.py b/fluid/object_detection/data_util.py deleted file mode 100644 index ac022593..00000000 --- a/fluid/object_detection/data_util.py +++ /dev/null @@ -1,151 +0,0 @@ -""" -This code is based on https://github.com/fchollet/keras/blob/master/keras/utils/data_utils.py -""" - -import time -import numpy as np -import threading -import multiprocessing -try: - import queue -except ImportError: - import Queue as queue - - -class GeneratorEnqueuer(object): - """ - Builds a queue out of a data generator. - - Args: - generator: a generator function which endlessly yields data - use_multiprocessing (bool): use multiprocessing if True, - otherwise use threading. - wait_time (float): time to sleep in-between calls to `put()`. - random_seed (int): Initial seed for workers, - will be incremented by one for each workers. - """ - - def __init__(self, - generator, - use_multiprocessing=False, - wait_time=0.05, - random_seed=None): - self.wait_time = wait_time - self._generator = generator - self._use_multiprocessing = use_multiprocessing - self._threads = [] - self._stop_event = None - self.queue = None - self._manager = None - self.seed = random_seed - - def start(self, workers=1, max_queue_size=10): - """ - Start worker threads which add data from the generator into the queue. - - Args: - workers (int): number of worker threads - max_queue_size (int): queue size - (when full, threads could block on `put()`) - """ - - def data_generator_task(): - """ - Data generator task. - """ - - def task(): - if (self.queue is not None and - self.queue.qsize() < max_queue_size): - generator_output = next(self._generator) - self.queue.put((generator_output)) - else: - time.sleep(self.wait_time) - - if not self._use_multiprocessing: - while not self._stop_event.is_set(): - with self.genlock: - try: - task() - except Exception: - self._stop_event.set() - break - else: - while not self._stop_event.is_set(): - try: - task() - except Exception: - self._stop_event.set() - break - - try: - if self._use_multiprocessing: - self._manager = multiprocessing.Manager() - self.queue = self._manager.Queue(maxsize=max_queue_size) - self._stop_event = multiprocessing.Event() - else: - self.genlock = threading.Lock() - self.queue = queue.Queue() - self._stop_event = threading.Event() - for _ in range(workers): - if self._use_multiprocessing: - # Reset random seed else all children processes - # share the same seed - np.random.seed(self.seed) - thread = multiprocessing.Process(target=data_generator_task) - thread.daemon = True - if self.seed is not None: - self.seed += 1 - else: - thread = threading.Thread(target=data_generator_task) - self._threads.append(thread) - thread.start() - except: - self.stop() - raise - - def is_running(self): - """ - Returns: - bool: Whether the worker theads are running. - """ - return self._stop_event is not None and not self._stop_event.is_set() - - def stop(self, timeout=None): - """ - Stops running threads and wait for them to exit, if necessary. - Should be called by the same thread which called `start()`. - - Args: - timeout(int|None): maximum time to wait on `thread.join()`. - """ - if self.is_running(): - self._stop_event.set() - for thread in self._threads: - if self._use_multiprocessing: - if thread.is_alive(): - thread.terminate() - else: - thread.join(timeout) - if self._manager: - self._manager.shutdown() - - self._threads = [] - self._stop_event = None - self.queue = None - - def get(self): - """ - Creates a generator to extract data from the queue. - Skip the data if it is `None`. - - # Yields - tuple of data in the queue. - """ - while self.is_running(): - if not self.queue.empty(): - inputs = self.queue.get() - if inputs is not None: - yield inputs - else: - time.sleep(self.wait_time) diff --git a/fluid/object_detection/reader.py b/fluid/object_detection/reader.py index 60c108d0..82dad83f 100644 --- a/fluid/object_detection/reader.py +++ b/fluid/object_detection/reader.py @@ -22,7 +22,6 @@ import os import time import copy import six -from data_util import GeneratorEnqueuer class Settings(object): @@ -168,7 +167,7 @@ def preprocess(img, bbox_labels, mode, settings): return img, sampled_labels -def coco(settings, file_list, mode, batch_size, shuffle): +def coco(settings, file_list, mode, shuffle): # cocoapi from pycocotools.coco import COCO from pycocotools.cocoeval import COCOeval @@ -183,10 +182,9 @@ def coco(settings, file_list, mode, batch_size, shuffle): images = images[:settings.toy] if len(images) > settings.toy else images print("{} on {} with {} images".format(mode, settings.dataset, len(images))) - while True: - if mode == "train" and shuffle: + def reader(): + if mode == 'train' and shuffle: np.random.shuffle(images) - batch_out = [] for image in images: image_name = image['file_name'] image_path = os.path.join(settings.data_dir, image_name) @@ -223,28 +221,25 @@ def coco(settings, file_list, mode, batch_size, shuffle): boxes = sample_labels[:, 1:5] lbls = sample_labels[:, 0].astype('int32') iscrowd = sample_labels[:, -1].astype('int32') - if 'cocoMAP' in settings.ap_version: - batch_out.append((im, boxes, lbls, iscrowd, - [im_id, im_width, im_height])) + yield im, boxes, lbls, iscrowd, \ + [im_id, im_width, im_height] else: - batch_out.append((im, boxes, lbls, iscrowd)) - if len(batch_out) == batch_size: - yield batch_out - batch_out = [] + yield im, boxes, lbls, iscrowd + + return reader -def pascalvoc(settings, file_list, mode, batch_size, shuffle): +def pascalvoc(settings, file_list, mode, shuffle): flist = open(file_list) images = [line.strip() for line in flist] if not settings.toy == 0: images = images[:settings.toy] if len(images) > settings.toy else images print("{} on {} with {} images".format(mode, settings.dataset, len(images))) - while True: - if mode == "train" and shuffle: + def reader(): + if mode == 'train' and shuffle: np.random.shuffle(images) - batch_out = [] for image in images: image_path, label_path = image.split() image_path = os.path.join(settings.data_dir, image_path) @@ -278,51 +273,7 @@ def pascalvoc(settings, file_list, mode, batch_size, shuffle): boxes = sample_labels[:, 1:5] lbls = sample_labels[:, 0].astype('int32') difficults = sample_labels[:, -1].astype('int32') - batch_out.append((im, boxes, lbls, difficults)) - if len(batch_out) == batch_size: - yield batch_out - batch_out = [] - - -def batch_reader(settings, - file_list, - batch_size, - mode, - shuffle=True, - num_workers=8): - file_list = os.path.join(settings.data_dir, file_list) - if 'coco' in settings.dataset: - train_settings = copy.copy(settings) - if '2014' in file_list: - sub_dir = "train2014" - elif '2017' in file_list: - sub_dir = "train2017" - train_settings.data_dir = os.path.join(settings.data_dir, sub_dir) - - def reader(): - try: - if 'coco' in settings.dataset: - enqueuer = GeneratorEnqueuer( - coco(train_settings, file_list, mode, batch_size, shuffle), - use_multiprocessing=False) - else: - enqueuer = GeneratorEnqueuer( - pascalvoc(settings, file_list, mode, batch_size, shuffle), - use_multiprocessing=False) - enqueuer.start(max_queue_size=24, workers=num_workers) - generator_output = None - while True: - while enqueuer.is_running(): - if not enqueuer.queue.empty(): - generator_output = enqueuer.queue.get() - break - else: - time.sleep(0.01) - yield generator_output - generator_output = None - finally: - if enqueuer is not None: - enqueuer.stop() + yield im, boxes, lbls, difficults return reader @@ -341,7 +292,7 @@ def train(settings, file_list, shuffle=True): return pascalvoc(settings, file_list, 'train', shuffle) -def test(settings, file_list, batch_size): +def test(settings, file_list): file_list = os.path.join(settings.data_dir, file_list) if 'coco' in settings.dataset: test_settings = copy.copy(settings) @@ -350,9 +301,9 @@ def test(settings, file_list, batch_size): elif '2017' in file_list: sub_dir = "val2017" test_settings.data_dir = os.path.join(settings.data_dir, sub_dir) - return coco(test_settings, file_list, 'test', batch_size, False) + return coco(test_settings, file_list, 'test', False) else: - return pascalvoc(settings, file_list, 'test', batch_size, False) + return pascalvoc(settings, file_list, 'test', False) def infer(settings, image_path): diff --git a/fluid/object_detection/train.py b/fluid/object_detection/train.py index b40da930..5c185ae0 100644 --- a/fluid/object_detection/train.py +++ b/fluid/object_detection/train.py @@ -15,7 +15,7 @@ parser = argparse.ArgumentParser(description=__doc__) add_arg = functools.partial(add_arguments, argparser=parser) # yapf: disable add_arg('learning_rate', float, 0.001, "Learning rate.") -add_arg('batch_size', int, 16, "Minibatch size.") +add_arg('batch_size', int, 64, "Minibatch size.") add_arg('num_passes', int, 120, "Epoch number.") add_arg('use_gpu', bool, True, "Whether use GPU.") add_arg('parallel', bool, True, "Parallel.") @@ -36,56 +36,6 @@ add_arg('data_dir', str, 'data/pascalvoc', "data directory") add_arg('enable_ce', bool, False, "Whether use CE to evaluate the model") #yapf: enable -def build_program(is_train, main_prog, startup_prog, args, data_args, - boundaries=None, values=None, train_file_list=None): - image_shape = [3, data_args.resize_h, data_args.resize_w] - if 'coco' in data_args.dataset: - num_classes = 91 - elif 'pascalvoc' in data_args.dataset: - num_classes = 21 - - def get_optimizer(): - if not args.enable_ce: - optimizer = fluid.optimizer.RMSProp( - learning_rate=fluid.layers.piecewise_decay(boundaries, values), - regularization=fluid.regularizer.L2Decay(0.00005), ) - else: - optimizer = fluid.optimizer.RMSProp(learning_rate=0.001) - return optimizer - - with fluid.program_guard(main_prog, startup_prog): - py_reader = fluid.layers.py_reader( - capacity=64, - shapes=[[-1] + image_shape, [-1, 4], [-1, 1], [-1, 1]], - lod_levels=[0, 1, 1, 1], - dtypes=["float32", "float32", "int32", "int32"], - use_double_buffer=True) - with fluid.unique_name.guard(): - image, gt_box, gt_label, difficult = fluid.layers.read_file(py_reader) - locs, confs, box, box_var = mobile_net(num_classes, image, image_shape) - if is_train: - loss = fluid.layers.ssd_loss(locs, confs, gt_box, gt_label, box, - box_var) - loss = fluid.layers.reduce_sum(loss) - optimizer = get_optimizer() - optimizer.minimize(loss) - else: - nmsed_out = fluid.layers.detection_output( - locs, confs, box, box_var, nms_threshold=args.nms_threshold) - with fluid.program_guard(main_prog): - loss = fluid.evaluator.DetectionMAP( - nmsed_out, - gt_label, - gt_box, - difficult, - num_classes, - overlap_threshold=0.5, - evaluate_difficult=False, - ap_version=args.ap_version) - if not is_train: - main_prog = main_prog.clone(for_test=True) - return py_reader, loss - def train(args, train_file_list, val_file_list, @@ -95,140 +45,150 @@ def train(args, num_passes, model_save_dir, pretrained_model=None): + if args.enable_ce: + fluid.framework.default_startup_program().random_seed = 111 - startup_prog = fluid.Program() - train_prog = fluid.Program() - test_prog = fluid.Program() + image_shape = [3, data_args.resize_h, data_args.resize_w] + if 'coco' in data_args.dataset: + num_classes = 91 + elif 'pascalvoc' in data_args.dataset: + num_classes = 21 devices = os.getenv("CUDA_VISIBLE_DEVICES") or "" devices_num = len(devices.split(",")) + + image = fluid.layers.data(name='image', shape=image_shape, dtype='float32') + gt_box = fluid.layers.data( + name='gt_box', shape=[4], dtype='float32', lod_level=1) + gt_label = fluid.layers.data( + name='gt_label', shape=[1], dtype='int32', lod_level=1) + difficult = fluid.layers.data( + name='gt_difficult', shape=[1], dtype='int32', lod_level=1) + locs, confs, box, box_var = mobile_net(num_classes, image, image_shape) + nmsed_out = fluid.layers.detection_output( + locs, confs, box, box_var, nms_threshold=args.nms_threshold) + loss = fluid.layers.ssd_loss(locs, confs, gt_box, gt_label, box, + box_var) + loss = fluid.layers.reduce_sum(loss) + + test_program = fluid.default_main_program().clone(for_test=True) + with fluid.program_guard(test_program): + map_eval = fluid.evaluator.DetectionMAP( + nmsed_out, + gt_label, + gt_box, + difficult, + num_classes, + overlap_threshold=0.5, + evaluate_difficult=False, + ap_version=args.ap_version) + if 'coco' in data_args.dataset: # learning rate decay in 12, 19 pass, respectively if '2014' in train_file_list: - epocs = 82783 // batch_size // devices_num - test_epocs = 40504 // batch_size + epocs = 82783 // batch_size boundaries = [epocs * 12, epocs * 19] elif '2017' in train_file_list: - epocs = 118287 // batch_size // devices_num - test_epocs = 5000 // batch_size + epocs = 118287 // batch_size boundaries = [epocs * 12, epocs * 19] - values = [learning_rate, learning_rate * 0.5, - learning_rate * 0.25] + values = [ + learning_rate, learning_rate * 0.5, learning_rate * 0.25 + ] elif 'pascalvoc' in data_args.dataset: - epocs = 19200 // batch_size // devices_num - test_epocs = 4952 // batch_size + epocs = 19200 // batch_size boundaries = [epocs * 40, epocs * 60, epocs * 80, epocs * 100] values = [ learning_rate, learning_rate * 0.5, learning_rate * 0.25, - learning_rate * 0.1, learning_rate * 0.01] + learning_rate * 0.1, learning_rate * 0.01 + ] + optimizer = fluid.optimizer.RMSProp( + learning_rate=fluid.layers.piecewise_decay(boundaries, values), + regularization=fluid.regularizer.L2Decay(0.00005), ) - if args.enable_ce: - startup_prog.random_seed = 111 - train_prog.random_seed = 111 - test_prog.random_seed = 111 + optimizer.minimize(loss) - train_py_reader, loss = build_program( - is_train=True, - main_prog=train_prog, - startup_prog=startup_prog, - args=args, - data_args=data_args, - values = values, - boundaries = boundaries, - train_file_list=train_file_list) - test_py_reader, map_eval = build_program( - is_train=False, - main_prog=test_prog, - startup_prog=startup_prog, - args=args, - data_args=data_args) place = fluid.CUDAPlace(0) if args.use_gpu else fluid.CPUPlace() exe = fluid.Executor(place) - exe.run(startup_prog) + exe.run(fluid.default_startup_program()) if pretrained_model: def if_exist(var): return os.path.exists(os.path.join(pretrained_model, var.name)) - fluid.io.load_vars(exe, pretrained_model, main_program=train_prog, predicate=if_exist) + fluid.io.load_vars(exe, pretrained_model, predicate=if_exist) if args.parallel: - train_exe = fluid.ParallelExecutor(main_program=train_prog, + train_exe = fluid.ParallelExecutor( use_cuda=args.use_gpu, loss_name=loss.name) - test_exe = fluid.ParallelExecutor(main_program=test_prog, - use_cuda=args.use_gpu, share_vars_from=train_exe) + if not args.enable_ce: - train_reader = reader.batch_reader(data_args, train_file_list, batch_size, "train") + train_reader = paddle.batch( + reader.train(data_args, train_file_list), batch_size=batch_size) else: import random random.seed(0) np.random.seed(0) - train_reader = reader.batch_reader(data_args, train_file_list, batch_size, "train", shuffle=False) - test_reader = reader.batch_reader(data_args, val_file_list, batch_size, "test") - train_py_reader.decorate_paddle_reader(train_reader) - test_py_reader.decorate_paddle_reader(test_reader) - - def save_model(postfix, main_prog): + train_reader = paddle.batch( + reader.train(data_args, train_file_list, False), batch_size=batch_size) + test_reader = paddle.batch( + reader.test(data_args, val_file_list), batch_size=batch_size) + feeder = fluid.DataFeeder( + place=place, feed_list=[image, gt_box, gt_label, difficult]) + + def save_model(postfix): model_path = os.path.join(model_save_dir, postfix) if os.path.isdir(model_path): shutil.rmtree(model_path) print('save models to %s' % (model_path)) - fluid.io.save_persistables(exe, model_path, main_program=main_prog) + fluid.io.save_persistables(exe, model_path) best_map = 0. + def test(pass_id, best_map): _, accum_map = map_eval.get_map_var() map_eval.reset(exe) every_pass_map=[] - test_py_reader.start() - batch_id = 0 - try: - while True: - test_map, = exe.run(test_prog, - fetch_list=[accum_map]) - if batch_id % 20 == 0: - every_pass_map.append(test_map) - print("Batch {0}, map {1}".format(batch_id, test_map)) - batch_id += 1 - if batch_id > test_epocs: - break - except fluid.core.EOFException: - test_py_reader.reset() + for batch_id, data in enumerate(test_reader()): + test_map, = exe.run(test_program, + feed=feeder.feed(data), + fetch_list=[accum_map]) + if batch_id % 20 == 0: + every_pass_map.append(test_map) + print("Batch {0}, map {1}".format(batch_id, test_map)) mean_map = np.mean(every_pass_map) if test_map[0] > best_map: best_map = test_map[0] - save_model('best_model', test_prog) + save_model('best_model') print("Pass {0}, test map {1}".format(pass_id, test_map)) return best_map, mean_map for pass_id in range(num_passes): batch_begin = time.time() start_time = time.time() - train_py_reader.start() + prev_start_time = start_time every_pass_loss = [] - batch_id = 0 - try: - while True: - prev_start_time = start_time - start_time = time.time() - - if args.parallel: - loss_v, = train_exe.run(fetch_list=[loss.name]) - else: - loss_v, = exe.run(train_prog, fetch_list=[loss]) - loss_v = np.mean(np.array(loss_v)) - every_pass_loss.append(loss_v) - if batch_id % 20 == 0: - print("Pass {0}, batch {1}, loss {2}, time {3}".format( - pass_id, batch_id, loss_v, start_time - prev_start_time)) - batch_id += 1 - if batch_id > epocs: - break - except fluid.core.EOFException: - train_py_reader.reset() - batch_end = time.time() + for batch_id, data in enumerate(train_reader()): + prev_start_time = start_time + start_time = time.time() + if len(data) < (devices_num * 2): + print("There are too few data to train on all devices.") + continue + if args.parallel: + loss_v, = train_exe.run(fetch_list=[loss.name], + feed=feeder.feed(data)) + else: + loss_v, = exe.run(fluid.default_main_program(), + feed=feeder.feed(data), + fetch_list=[loss]) + loss_v = np.mean(np.array(loss_v)) + every_pass_loss.append(loss_v) + if batch_id % 20 == 0: + print("Pass {0}, batch {1}, loss {2}, time {3}".format( + pass_id, batch_id, loss_v, start_time - prev_start_time)) + + end_time = time.time() best_map, mean_map = test(pass_id, best_map) - if args.enable_ce and pass_id == num_passes - 1: - total_time = batch_end - batch_begin + if args.enable_ce and pass_id == args.num_passes - 1: + total_time = end_time - start_time train_avg_loss = np.mean(every_pass_loss) if devices_num == 1: print("kpis train_cost %s" % train_avg_loss) @@ -240,10 +200,11 @@ def train(args, print("kpis test_acc_card%s %s" % (devices_num, mean_map)) print("kpis train_speed_card%s %f" % - (devices_num, test_epocs / total_time)) + (devices_num, epocs / total_time)) + if pass_id % 10 == 0 or pass_id == num_passes - 1: - save_model(str(pass_id), train_prog) + save_model(str(pass_id)) print("Best test map {0}".format(best_map)) if __name__ == '__main__': -- GitLab