From 092914524cdfced300cbd5d5dcf1db998f99c7dc Mon Sep 17 00:00:00 2001 From: Bai Yifan Date: Mon, 27 Aug 2018 11:08:59 +0800 Subject: [PATCH] Object_detection reader accelerate (#1177) * object_detection accelerate --- fluid/object_detection/data_util.py | 151 +++++++++++++++++++ fluid/object_detection/reader.py | 79 ++++++++-- fluid/object_detection/train.py | 221 ++++++++++++++++------------ 3 files changed, 344 insertions(+), 107 deletions(-) create mode 100644 fluid/object_detection/data_util.py diff --git a/fluid/object_detection/data_util.py b/fluid/object_detection/data_util.py new file mode 100644 index 00000000..ac022593 --- /dev/null +++ b/fluid/object_detection/data_util.py @@ -0,0 +1,151 @@ +""" +This code is based on https://github.com/fchollet/keras/blob/master/keras/utils/data_utils.py +""" + +import time +import numpy as np +import threading +import multiprocessing +try: + import queue +except ImportError: + import Queue as queue + + +class GeneratorEnqueuer(object): + """ + Builds a queue out of a data generator. + + Args: + generator: a generator function which endlessly yields data + use_multiprocessing (bool): use multiprocessing if True, + otherwise use threading. + wait_time (float): time to sleep in-between calls to `put()`. + random_seed (int): Initial seed for workers, + will be incremented by one for each workers. + """ + + def __init__(self, + generator, + use_multiprocessing=False, + wait_time=0.05, + random_seed=None): + self.wait_time = wait_time + self._generator = generator + self._use_multiprocessing = use_multiprocessing + self._threads = [] + self._stop_event = None + self.queue = None + self._manager = None + self.seed = random_seed + + def start(self, workers=1, max_queue_size=10): + """ + Start worker threads which add data from the generator into the queue. + + Args: + workers (int): number of worker threads + max_queue_size (int): queue size + (when full, threads could block on `put()`) + """ + + def data_generator_task(): + """ + Data generator task. + """ + + def task(): + if (self.queue is not None and + self.queue.qsize() < max_queue_size): + generator_output = next(self._generator) + self.queue.put((generator_output)) + else: + time.sleep(self.wait_time) + + if not self._use_multiprocessing: + while not self._stop_event.is_set(): + with self.genlock: + try: + task() + except Exception: + self._stop_event.set() + break + else: + while not self._stop_event.is_set(): + try: + task() + except Exception: + self._stop_event.set() + break + + try: + if self._use_multiprocessing: + self._manager = multiprocessing.Manager() + self.queue = self._manager.Queue(maxsize=max_queue_size) + self._stop_event = multiprocessing.Event() + else: + self.genlock = threading.Lock() + self.queue = queue.Queue() + self._stop_event = threading.Event() + for _ in range(workers): + if self._use_multiprocessing: + # Reset random seed else all children processes + # share the same seed + np.random.seed(self.seed) + thread = multiprocessing.Process(target=data_generator_task) + thread.daemon = True + if self.seed is not None: + self.seed += 1 + else: + thread = threading.Thread(target=data_generator_task) + self._threads.append(thread) + thread.start() + except: + self.stop() + raise + + def is_running(self): + """ + Returns: + bool: Whether the worker theads are running. + """ + return self._stop_event is not None and not self._stop_event.is_set() + + def stop(self, timeout=None): + """ + Stops running threads and wait for them to exit, if necessary. + Should be called by the same thread which called `start()`. + + Args: + timeout(int|None): maximum time to wait on `thread.join()`. + """ + if self.is_running(): + self._stop_event.set() + for thread in self._threads: + if self._use_multiprocessing: + if thread.is_alive(): + thread.terminate() + else: + thread.join(timeout) + if self._manager: + self._manager.shutdown() + + self._threads = [] + self._stop_event = None + self.queue = None + + def get(self): + """ + Creates a generator to extract data from the queue. + Skip the data if it is `None`. + + # Yields + tuple of data in the queue. + """ + while self.is_running(): + if not self.queue.empty(): + inputs = self.queue.get() + if inputs is not None: + yield inputs + else: + time.sleep(self.wait_time) diff --git a/fluid/object_detection/reader.py b/fluid/object_detection/reader.py index da0aa788..09dba378 100644 --- a/fluid/object_detection/reader.py +++ b/fluid/object_detection/reader.py @@ -23,6 +23,7 @@ import os import time import copy import six +from data_util import GeneratorEnqueuer class Settings(object): @@ -168,7 +169,7 @@ def preprocess(img, bbox_labels, mode, settings): return img, sampled_labels -def coco(settings, file_list, mode, shuffle): +def coco(settings, file_list, mode, batch_size, shuffle): # cocoapi from pycocotools.coco import COCO from pycocotools.cocoeval import COCOeval @@ -183,9 +184,10 @@ def coco(settings, file_list, mode, shuffle): images = images[:settings.toy] if len(images) > settings.toy else images print("{} on {} with {} images".format(mode, settings.dataset, len(images))) - def reader(): - if mode == 'train' and shuffle: + while True: + if mode == "train" and shuffle: random.shuffle(images) + batch_out = [] for image in images: image_name = image['file_name'] image_path = os.path.join(settings.data_dir, image_name) @@ -222,25 +224,28 @@ def coco(settings, file_list, mode, shuffle): boxes = sample_labels[:, 1:5] lbls = sample_labels[:, 0].astype('int32') iscrowd = sample_labels[:, -1].astype('int32') + if 'cocoMAP' in settings.ap_version: - yield im, boxes, lbls, iscrowd, \ - [im_id, im_width, im_height] + batch_out.append((im, boxes, lbls, iscrowd, + [im_id, im_width, im_height])) else: - yield im, boxes, lbls, iscrowd - - return reader + batch_out.append((im, boxes, lbls, iscrowd)) + if len(batch_out) == batch_size: + yield batch_out + batch_out = [] -def pascalvoc(settings, file_list, mode, shuffle): +def pascalvoc(settings, file_list, mode, batch_size, shuffle): flist = open(file_list) images = [line.strip() for line in flist] if not settings.toy == 0: images = images[:settings.toy] if len(images) > settings.toy else images print("{} on {} with {} images".format(mode, settings.dataset, len(images))) - def reader(): - if mode == 'train' and shuffle: + while True: + if mode == "train" and shuffle: random.shuffle(images) + batch_out = [] for image in images: image_path, label_path = image.split() image_path = os.path.join(settings.data_dir, image_path) @@ -274,7 +279,51 @@ def pascalvoc(settings, file_list, mode, shuffle): boxes = sample_labels[:, 1:5] lbls = sample_labels[:, 0].astype('int32') difficults = sample_labels[:, -1].astype('int32') - yield im, boxes, lbls, difficults + batch_out.append((im, boxes, lbls, difficults)) + if len(batch_out) == batch_size: + yield batch_out + batch_out = [] + + +def batch_reader(settings, + file_list, + batch_size, + mode, + shuffle=True, + num_workers=8): + file_list = os.path.join(settings.data_dir, file_list) + if 'coco' in settings.dataset: + train_settings = copy.copy(settings) + if '2014' in file_list: + sub_dir = "train2014" + elif '2017' in file_list: + sub_dir = "train2017" + train_settings.data_dir = os.path.join(settings.data_dir, sub_dir) + + def reader(): + try: + if 'coco' in settings.dataset: + enqueuer = GeneratorEnqueuer( + coco(train_settings, file_list, mode, batch_size, shuffle), + use_multiprocessing=False) + else: + enqueuer = GeneratorEnqueuer( + pascalvoc(settings, file_list, mode, batch_size, shuffle), + use_multiprocessing=False) + enqueuer.start(max_queue_size=24, workers=num_workers) + generator_output = None + while True: + while enqueuer.is_running(): + if not enqueuer.queue.empty(): + generator_output = enqueuer.queue.get() + break + else: + time.sleep(0.01) + yield generator_output + generator_output = None + finally: + if enqueuer is not None: + enqueuer.stop() return reader @@ -293,7 +342,7 @@ def train(settings, file_list, shuffle=True): return pascalvoc(settings, file_list, 'train', shuffle) -def test(settings, file_list): +def test(settings, file_list, batch_size): file_list = os.path.join(settings.data_dir, file_list) if 'coco' in settings.dataset: test_settings = copy.copy(settings) @@ -302,9 +351,9 @@ def test(settings, file_list): elif '2017' in file_list: sub_dir = "val2017" test_settings.data_dir = os.path.join(settings.data_dir, sub_dir) - return coco(test_settings, file_list, 'test', False) + return coco(test_settings, file_list, 'test', batch_size, False) else: - return pascalvoc(settings, file_list, 'test', False) + return pascalvoc(settings, file_list, 'test', batch_size, False) def infer(settings, image_path): diff --git a/fluid/object_detection/train.py b/fluid/object_detection/train.py index 706a33be..78d8c9e7 100644 --- a/fluid/object_detection/train.py +++ b/fluid/object_detection/train.py @@ -15,7 +15,7 @@ parser = argparse.ArgumentParser(description=__doc__) add_arg = functools.partial(add_arguments, argparser=parser) # yapf: disable add_arg('learning_rate', float, 0.001, "Learning rate.") -add_arg('batch_size', int, 64, "Minibatch size.") +add_arg('batch_size', int, 16, "Minibatch size.") add_arg('num_passes', int, 120, "Epoch number.") add_arg('use_gpu', bool, True, "Whether use GPU.") add_arg('parallel', bool, True, "Parallel.") @@ -36,6 +36,52 @@ add_arg('data_dir', str, 'data/pascalvoc', "data directory") add_arg('enable_ce', bool, False, "Whether use CE to evaluate the model") #yapf: enable +def build_program(is_train, main_prog, startup_prog, args, data_args, + boundaries=None, values=None, train_file_list=None): + image_shape = [3, data_args.resize_h, data_args.resize_w] + if 'coco' in data_args.dataset: + num_classes = 91 + elif 'pascalvoc' in data_args.dataset: + num_classes = 21 + + def get_optimizer(): + optimizer = fluid.optimizer.RMSProp( + learning_rate=fluid.layers.piecewise_decay(boundaries, values), + regularization=fluid.regularizer.L2Decay(0.00005), ) + return optimizer + + with fluid.program_guard(main_prog, startup_prog): + py_reader = fluid.layers.py_reader( + capacity=64, + shapes=[[-1] + image_shape, [-1, 4], [-1, 1], [-1, 1]], + lod_levels=[0, 1, 1, 1], + dtypes=["float32", "float32", "int32", "int32"], + use_double_buffer=True) + with fluid.unique_name.guard(): + image, gt_box, gt_label, difficult = fluid.layers.read_file(py_reader) + locs, confs, box, box_var = mobile_net(num_classes, image, image_shape) + if is_train: + loss = fluid.layers.ssd_loss(locs, confs, gt_box, gt_label, box, + box_var) + loss = fluid.layers.reduce_sum(loss) + optimizer = get_optimizer() + optimizer.minimize(loss) + else: + nmsed_out = fluid.layers.detection_output( + locs, confs, box, box_var, nms_threshold=args.nms_threshold) + with fluid.program_guard(main_prog): + loss = fluid.evaluator.DetectionMAP( + nmsed_out, + gt_label, + gt_box, + difficult, + num_classes, + overlap_threshold=0.5, + evaluate_difficult=False, + ap_version=args.ap_version) + if not is_train: + main_prog = main_prog.clone(for_test=True) + return py_reader, loss def train(args, train_file_list, @@ -46,119 +92,108 @@ def train(args, num_passes, model_save_dir, pretrained_model=None): - if args.enable_ce: - fluid.framework.default_startup_program().random_seed = 111 - image_shape = [3, data_args.resize_h, data_args.resize_w] - if 'coco' in data_args.dataset: - num_classes = 91 - elif 'pascalvoc' in data_args.dataset: - num_classes = 21 + startup_prog = fluid.Program() + train_prog = fluid.Program() + test_prog = fluid.Program() devices = os.getenv("CUDA_VISIBLE_DEVICES") or "" devices_num = len(devices.split(",")) - - image = fluid.layers.data(name='image', shape=image_shape, dtype='float32') - gt_box = fluid.layers.data( - name='gt_box', shape=[4], dtype='float32', lod_level=1) - gt_label = fluid.layers.data( - name='gt_label', shape=[1], dtype='int32', lod_level=1) - difficult = fluid.layers.data( - name='gt_difficult', shape=[1], dtype='int32', lod_level=1) - locs, confs, box, box_var = mobile_net(num_classes, image, image_shape) - nmsed_out = fluid.layers.detection_output( - locs, confs, box, box_var, nms_threshold=args.nms_threshold) - loss = fluid.layers.ssd_loss(locs, confs, gt_box, gt_label, box, - box_var) - loss = fluid.layers.reduce_sum(loss) - - test_program = fluid.default_main_program().clone(for_test=True) - with fluid.program_guard(test_program): - map_eval = fluid.evaluator.DetectionMAP( - nmsed_out, - gt_label, - gt_box, - difficult, - num_classes, - overlap_threshold=0.5, - evaluate_difficult=False, - ap_version=args.ap_version) - if 'coco' in data_args.dataset: # learning rate decay in 12, 19 pass, respectively if '2014' in train_file_list: - epocs = 82783 // batch_size + epocs = 82783 // batch_size // devices_num + test_epocs = 40504 // batch_size boundaries = [epocs * 12, epocs * 19] elif '2017' in train_file_list: - epocs = 118287 // batch_size + epocs = 118287 // batch_size // devices_num + test_epocs = 5000 // batch_size boundaries = [epocs * 12, epocs * 19] - values = [ - learning_rate, learning_rate * 0.5, learning_rate * 0.25 - ] + values = [learning_rate, learning_rate * 0.5, + learning_rate * 0.25] elif 'pascalvoc' in data_args.dataset: - epocs = 19200 // batch_size + epocs = 19200 // batch_size // devices_num + test_epocs = 4952 // batch_size boundaries = [epocs * 40, epocs * 60, epocs * 80, epocs * 100] values = [ learning_rate, learning_rate * 0.5, learning_rate * 0.25, - learning_rate * 0.1, learning_rate * 0.01 - ] - optimizer = fluid.optimizer.RMSProp( - learning_rate=fluid.layers.piecewise_decay(boundaries, values), - regularization=fluid.regularizer.L2Decay(0.00005), ) + learning_rate * 0.1, learning_rate * 0.01] - optimizer.minimize(loss) + if args.enable_ce: + startup_prog.random_seed = 111 + train_prog.random_seed = 111 + test_prog.random_seed = 111 + train_py_reader, loss = build_program( + is_train=True, + main_prog=train_prog, + startup_prog=startup_prog, + args=args, + data_args=data_args, + values = values, + boundaries = boundaries, + train_file_list=train_file_list) + test_py_reader, map_eval = build_program( + is_train=False, + main_prog=test_prog, + startup_prog=startup_prog, + args=args, + data_args=data_args) place = fluid.CUDAPlace(0) if args.use_gpu else fluid.CPUPlace() exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) + exe.run(startup_prog) if pretrained_model: def if_exist(var): return os.path.exists(os.path.join(pretrained_model, var.name)) - fluid.io.load_vars(exe, pretrained_model, predicate=if_exist) + fluid.io.load_vars(exe, pretrained_model, main_program=train_prog, predicate=if_exist) if args.parallel: - train_exe = fluid.ParallelExecutor( + train_exe = fluid.ParallelExecutor(main_program=train_prog, use_cuda=args.use_gpu, loss_name=loss.name) - + test_exe = fluid.ParallelExecutor(main_program=test_prog, + use_cuda=args.use_gpu, share_vars_from=train_exe) if not args.enable_ce: - train_reader = paddle.batch( - reader.train(data_args, train_file_list), batch_size=batch_size) + train_reader = reader.batch_reader(data_args, train_file_list, batch_size, "train") else: import random random.seed(0) np.random.seed(0) - train_reader = paddle.batch( - reader.train(data_args, train_file_list, False), batch_size=batch_size) - test_reader = paddle.batch( - reader.test(data_args, val_file_list), batch_size=batch_size) - feeder = fluid.DataFeeder( - place=place, feed_list=[image, gt_box, gt_label, difficult]) - - def save_model(postfix): + train_reader = reader.batch_reader(data_args, train_file_list, batch_size, "train", shuffle=False) + test_reader = reader.batch_reader(data_args, val_file_list, batch_size, "test") + train_py_reader.decorate_paddle_reader(train_reader) + test_py_reader.decorate_paddle_reader(test_reader) + + def save_model(postfix, main_prog): model_path = os.path.join(model_save_dir, postfix) if os.path.isdir(model_path): shutil.rmtree(model_path) print('save models to %s' % (model_path)) - fluid.io.save_persistables(exe, model_path) + fluid.io.save_persistables(exe, model_path, main_program=main_prog) best_map = 0. - def test(pass_id, best_map): _, accum_map = map_eval.get_map_var() map_eval.reset(exe) every_pass_map=[] - for batch_id, data in enumerate(test_reader()): - test_map, = exe.run(test_program, - feed=feeder.feed(data), - fetch_list=[accum_map]) - if batch_id % 20 == 0: - every_pass_map.append(test_map) - print("Batch {0}, map {1}".format(batch_id, test_map)) + test_py_reader.start() + batch_id = 0 + try: + while True: + test_map, = exe.run(test_prog, + fetch_list=[accum_map]) + if batch_id % 20 == 0: + every_pass_map.append(test_map) + print("Batch {0}, map {1}".format(batch_id, test_map)) + batch_id += 1 + if batch_id > test_epocs: + break + except fluid.core.EOFException: + test_py_reader.reset() mean_map = np.mean(every_pass_map) if test_map[0] > best_map: best_map = test_map[0] - save_model('best_model') + save_model('best_model', test_prog) print("Pass {0}, test map {1}".format(pass_id, test_map)) return best_map, mean_map @@ -166,30 +201,33 @@ def train(args, for pass_id in range(num_passes): epoch_idx = pass_id + 1 start_time = time.time() + train_py_reader.start() prev_start_time = start_time every_pass_loss = [] - for batch_id, data in enumerate(train_reader()): - prev_start_time = start_time - start_time = time.time() - if len(data) < (devices_num * 2): - print("There are too few data to train on all devices.") - continue - if args.parallel: - loss_v, = train_exe.run(fetch_list=[loss.name], - feed=feeder.feed(data)) - else: - loss_v, = exe.run(fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[loss]) - loss_v = np.mean(np.array(loss_v)) - every_pass_loss.append(loss_v) - if batch_id % 20 == 0: - print("Pass {0}, batch {1}, loss {2}, time {3}".format( - pass_id, batch_id, loss_v, start_time - prev_start_time)) + batch_id = 0 + try: + while True: + prev_start_time = start_time + start_time = time.time() + + if args.parallel: + loss_v, = train_exe.run(fetch_list=[loss.name]) + else: + loss_v, = exe.run(train_prog, fetch_list=[loss]) + loss_v = np.mean(np.array(loss_v)) + every_pass_loss.append(loss_v) + if batch_id % 20 == 0: + print("Pass {0}, batch {1}, loss {2}, time {3}".format( + pass_id, batch_id, loss_v, start_time - prev_start_time)) + batch_id += 1 + if batch_id > epocs: + break + except fluid.core.EOFException: + train_py_reader.reset() end_time = time.time() best_map, mean_map = test(pass_id, best_map) - if args.enable_ce and pass_id == 1: + if args.enable_ce and pass_id == num_passes - 1: total_time += end_time - start_time train_avg_loss = np.mean(every_pass_loss) if devices_num == 1: @@ -204,9 +242,8 @@ def train(args, print("kpis train_speed_card%s %f" % (devices_num, total_time / epoch_idx)) - if pass_id % 10 == 0 or pass_id == num_passes - 1: - save_model(str(pass_id)) + save_model(str(pass_id), train_prog) print("Best test map {0}".format(best_map)) if __name__ == '__main__': -- GitLab