From f90c7865f0ababccd89332604e7e181c85cf9b60 Mon Sep 17 00:00:00 2001 From: Wu Yi Date: Sat, 8 Sep 2018 16:05:56 +0800 Subject: [PATCH] Benchmark tool for imgnet (#12305) * support test using executor without reader * run imgnet * update fluid benchmark * wip * update * update all models * support pyreader * update * clean up * make profile batches contollable * update API.spec * update scripts * clean dockerfile * update * clean comments * add scope argument docstring * use num_trainers to determine nccl init comms --- benchmark/fluid/Dockerfile | 4 +- benchmark/fluid/args.py | 16 +- benchmark/fluid/fluid_benchmark.py | 274 +++++++------- benchmark/fluid/imagenet_reader.py | 344 ++++++++++++++++++ benchmark/fluid/kube_gen_job.py | 13 + benchmark/fluid/models/__init__.py | 3 +- benchmark/fluid/models/machine_translation.py | 48 ++- benchmark/fluid/models/mnist.py | 95 +++-- benchmark/fluid/models/resnet.py | 197 ++++++---- .../fluid/models/resnet_with_preprocess.py | 268 ++++++++++++++ benchmark/fluid/models/se_resnext.py | 286 +++++++++++++++ .../fluid/models/stacked_dynamic_lstm.py | 75 ++-- benchmark/fluid/models/vgg.py | 101 +++-- paddle/fluid/API.spec | 2 +- paddle/fluid/platform/nccl_helper.h | 7 +- python/paddle/fluid/parallel_executor.py | 7 +- 16 files changed, 1342 insertions(+), 398 deletions(-) create mode 100644 benchmark/fluid/imagenet_reader.py create mode 100644 benchmark/fluid/models/resnet_with_preprocess.py create mode 100644 benchmark/fluid/models/se_resnext.py diff --git a/benchmark/fluid/Dockerfile b/benchmark/fluid/Dockerfile index 707fadb1f..2e1e0d376 100644 --- a/benchmark/fluid/Dockerfile +++ b/benchmark/fluid/Dockerfile @@ -11,6 +11,7 @@ RUN ln -s /usr/lib/x86_64-linux-gnu/libcudnn.so.7 /usr/lib/libcudnn.so && ln -s # Add "ENV http_proxy=http://ip:port" if your download is slow, and don't forget to unset it at runtime. # exmaple: unset http_proxy && unset https_proxy && python fluid_benchmark.py ... + RUN pip install -U pip RUN pip install -U kubernetes paddlepaddle @@ -27,5 +28,6 @@ ADD *.whl / RUN pip install /*.whl && rm -f /*.whl ENV LD_LIBRARY_PATH=/usr/local/lib -ADD fluid_benchmark.py recordio_converter.py args.py recordio_converter.py run.sh run_fluid_benchmark.sh /workspace/ +ADD fluid_benchmark.py recordio_converter.py args.py recordio_converter.py run.sh run_fluid_benchmark.sh imagenet_reader.py /workspace/ ADD models/ /workspace/models/ + diff --git a/benchmark/fluid/args.py b/benchmark/fluid/args.py index a79f25ccc..ed696e82f 100644 --- a/benchmark/fluid/args.py +++ b/benchmark/fluid/args.py @@ -17,7 +17,8 @@ import argparse __all__ = ['parse_args', ] BENCHMARK_MODELS = [ - "machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm" + "machine_translation", "resnet", "se_resnext", "vgg", "mnist", + "stacked_dynamic_lstm", "resnet_with_preprocess" ] @@ -67,12 +68,12 @@ def parse_args(): '--cpus', type=int, default=1, - help='If cpus > 1, will use ParallelDo to run, else use Executor.') + help='If cpus > 1, will set ParallelExecutor to use multiple threads.') parser.add_argument( '--data_set', type=str, default='flowers', - choices=['cifar10', 'flowers'], + choices=['cifar10', 'flowers', 'imagenet'], help='Optional dataset for benchmark.') parser.add_argument( '--infer_only', action='store_true', help='If set, run forward only.') @@ -122,6 +123,11 @@ def parse_args(): type=str, default="", help='Directory that contains all the training recordio files.') + parser.add_argument( + '--test_data_path', + type=str, + default="", + help='Directory that contains all the test data (NOT recordio).') parser.add_argument( '--use_inference_transpiler', action='store_true', @@ -130,5 +136,9 @@ def parse_args(): '--no_random', action='store_true', help='If set, keep the random seed and do not shuffle the data.') + parser.add_argument( + '--use_lars', + action='store_true', + help='If set, use lars for optimizers, ONLY support resnet module.') args = parser.parse_args() return args diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index 53d010434..11bd75e1d 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -16,6 +16,7 @@ import argparse import cProfile import time import os +import traceback import numpy as np @@ -27,7 +28,7 @@ import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler from args import * -def append_nccl2_prepare(trainer_id): +def append_nccl2_prepare(trainer_id, startup_prog): if trainer_id >= 0: # append gen_nccl_id at the end of startup program trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) @@ -40,11 +41,11 @@ def append_nccl2_prepare(trainer_id): current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port worker_endpoints.remove(current_endpoint) - nccl_id_var = fluid.default_startup_program().global_block().create_var( + nccl_id_var = startup_prog.global_block().create_var( name="NCCLID", persistable=True, type=fluid.core.VarDesc.VarType.RAW) - fluid.default_startup_program().global_block().append_op( + startup_prog.global_block().append_op( type="gen_nccl_id", inputs={}, outputs={"NCCLID": nccl_id_var}, @@ -59,7 +60,7 @@ def append_nccl2_prepare(trainer_id): "nccl-based dist train.") -def dist_transpile(trainer_id, args): +def dist_transpile(trainer_id, args, train_prog, startup_prog): if trainer_id < 0: return None, None @@ -80,133 +81,69 @@ def dist_transpile(trainer_id, args): # the role, should be either PSERVER or TRAINER training_role = os.getenv("PADDLE_TRAINING_ROLE") - t = distribute_transpiler.DistributeTranspiler() + config = distribute_transpiler.DistributeTranspilerConfig() + config.slice_var_up = not args.no_split_var + t = distribute_transpiler.DistributeTranspiler(config=config) t.transpile( trainer_id, + # NOTE: *MUST* use train_prog, for we are using with guard to + # generate different program for train and test. + program=train_prog, pservers=pserver_endpoints, trainers=trainers, sync_mode=not args.async_mode) if training_role == "PSERVER": pserver_program = t.get_pserver_program(current_endpoint) - pserver_startup_program = t.get_startup_program(current_endpoint, - pserver_program) + pserver_startup_program = t.get_startup_program( + current_endpoint, pserver_program, startup_program=startup_prog) return pserver_program, pserver_startup_program elif training_role == "TRAINER": train_program = t.get_trainer_program() - return train_program, fluid.default_startup_program() + return train_program, startup_prog else: raise ValueError( 'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER' ) -def test(exe, inference_program, test_reader, feeder, batch_acc): - accuracy_evaluator = fluid.metrics.Accuracy() - for batch_id, data in enumerate(test_reader()): - acc = exe.run(inference_program, - feed=feeder.feed(data), - fetch_list=[batch_acc]) - accuracy_evaluator.update(value=np.array(acc), weight=len(data)) +def test_parallel(exe, test_args, args, test_prog, feeder): + acc_evaluators = [] + for i in xrange(len(test_args[2])): + acc_evaluators.append(fluid.metrics.Accuracy()) - return accuracy_evaluator.eval() - - -# TODO(wuyi): replace train, train_parallel, test functions with new trainer -# API once it is ready. -def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, - args, train_prog, startup_prog): - if os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER": - place = core.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - exe.run(train_prog) - return - - if args.use_fake_data: - raise Exception( - "fake data is not supported in single GPU test for now.") - - place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) - exe = fluid.Executor(place) - exe.run(startup_prog) - - # Use inference_transpiler to speedup - if not args.use_reader_op: - feed_var_list = [ - var for var in train_prog.global_block().vars.itervalues() - if var.is_data - ] - feeder = fluid.DataFeeder(feed_var_list, place) - - iters, num_samples, start_time = 0, 0, time.time() - for pass_id in range(args.pass_num): - train_losses = [] - if not args.use_reader_op: - reader_generator = train_reader() - batch_id = 0 - data = None + to_fetch = [v.name for v in test_args[2]] + if args.use_reader_op: + test_args[4].start() while True: - if not args.use_reader_op: - data = next(reader_generator, None) - if data == None: - break - if iters == args.iterations: - reader_generator.close() + try: + acc_rets = exe.run(fetch_list=to_fetch) + for i, e in enumerate(acc_evaluators): + e.update( + value=np.array(acc_rets[i]), weight=args.batch_size) + except fluid.core.EOFException as eof: + test_args[4].reset() break - if iters == args.skip_batch_num: - start_time = time.time() - num_samples = 0 + else: + for batch_id, data in enumerate(test_args[3]()): + acc_rets = exe.run(feed=feeder.feed(data), fetch_list=to_fetch) + for i, e in enumerate(acc_evaluators): + e.update(value=np.array(acc_rets[i]), weight=len(data)) - if args.use_reader_op: - try: - loss = exe.run(train_prog, fetch_list=[avg_loss]) - except fluid.core.EnforceNotMet as ex: - break - else: - loss = exe.run(train_prog, - feed=feeder.feed(data), - fetch_list=[avg_loss]) - iters += 1 - batch_id += 1 - # FIXME(wuyi): For use_reader_op, if the current - # pass is not the last, the last batch of this pass - # is also equal to args.batch_size. - if args.use_reader_op: - num_samples += args.batch_size * args.gpus - else: - num_samples += len(data) - train_losses.append(loss) - print("Pass: %d, Iter: %d, Loss: %f\n" % - (pass_id, iters, np.mean(train_losses))) - print_train_time(start_time, time.time(), num_samples) - print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))), - # evaluation - if not args.no_test and batch_acc and not args.use_reader_op: - if args.use_inference_transpiler: - t = fluid.InferenceTranspiler() - t.transpile(infer_prog, place) - - pass_test_acc = test(exe, infer_prog, test_reader, feeder, - batch_acc) - print(", Test Accuracy: %f" % pass_test_acc) - print("\n") - # TODO(wuyi): add warmup passes to get better perf data. - exit(0) + return [e.eval() for e in acc_evaluators] -# TODO(wuyi): replace train, train_parallel, test functions with new trainer -# API once it is ready. -def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, - batch_acc, args, train_prog, startup_prog, nccl_id_var, - num_trainers, trainer_id): +# NOTE: only need to benchmark using parallelexe +def train_parallel(train_args, test_args, args, train_prog, test_prog, + startup_prog, nccl_id_var, num_trainers, trainer_id): + over_all_start = time.time() place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) + feeder = None if not args.use_reader_op: feed_var_list = [ var for var in train_prog.global_block().vars.itervalues() if var.is_data ] feeder = fluid.DataFeeder(feed_var_list, place) - # generate fake: if args.use_fake_data: for var in feed_var_list: @@ -230,63 +167,110 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, startup_exe = fluid.Executor(place) startup_exe.run(startup_prog) strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 + strategy.num_threads = args.cpus strategy.allow_op_delay = False + avg_loss = train_args[0] + + if args.update_method == "pserver": + # parameter server mode distributed training, merge + # gradients on local server, do not initialize + # ParallelExecutor with multi server all-reduce mode. + num_trainers = 1 + trainer_id = 0 + exe = fluid.ParallelExecutor( True, avg_loss.name, + main_program=train_prog, exec_strategy=strategy, num_trainers=num_trainers, trainer_id=trainer_id) + if not args.no_test: + if args.update_method == "pserver": + test_scope = None + else: + # NOTE: use an empty scope to avoid test exe using NCCLID + test_scope = fluid.Scope() + test_exe = fluid.ParallelExecutor( + True, main_program=test_prog, share_vars_from=exe) + for pass_id in range(args.pass_num): num_samples = 0 iters = 0 start_time = time.time() if not args.use_reader_op: - reader_generator = train_reader() + reader_generator = train_args[3]() #train_reader batch_id = 0 data = None + if args.use_reader_op: + train_args[4].start() while True: if not args.use_reader_op: data = next(reader_generator, None) if data == None: break + if args.profile and batch_id == 5: + profiler.start_profiler("All") + profiler.reset_profiler() + elif args.profile and batch_id == 10: + print("profiling total time: ", time.time() - start_time) + profiler.stop_profiler("total", "/tmp/profile_%d_pass%d" % + (trainer_id, pass_id)) if iters == args.iterations: reader_generator.close() break - if args.profile and pass_id == 0 and batch_id == 5: - profiler.start_profiler("All") - elif args.profile and pass_id == 0 and batch_id == 10: - profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id) if iters == args.skip_batch_num: start_time = time.time() num_samples = 0 + fetch_list = [avg_loss.name] + acc_name_list = [v.name for v in train_args[2]] + fetch_list.extend(acc_name_list) + if args.use_fake_data or args.use_reader_op: try: - loss, = exe.run([avg_loss.name]) + + fetch_ret = exe.run(fetch_list) + except fluid.core.EOFException as eof: + break except fluid.core.EnforceNotMet as ex: + traceback.print_exc() break else: - loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) + fetch_ret = exe.run(fetch_list, feed=feeder.feed(data)) if args.use_reader_op: num_samples += args.batch_size * args.gpus else: num_samples += len(data) + iters += 1 if batch_id % 1 == 0: - print("Pass %d, batch %d, loss %s" % - (pass_id, batch_id, np.array(loss))) + fetched_data = [np.mean(np.array(d)) for d in fetch_ret] + print("Pass %d, batch %d, loss %s, accucacys: %s" % + (pass_id, batch_id, fetched_data[0], fetched_data[1:])) batch_id += 1 print_train_time(start_time, time.time(), num_samples) - if not args.no_test and batch_acc and not args.use_reader_op: - # we have not implement record io for test - # skip test when use args.use_reader_op - test_acc = test(startup_exe, infer_prog, test_reader, feeder, - batch_acc) - print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc)) + if args.use_reader_op: + train_args[4].reset() # reset reader handle + else: + del reader_generator + + if not args.no_test and test_args[2]: + test_feeder = None + if not args.use_reader_op: + test_feed_var_list = [ + var for var in test_prog.global_block().vars.itervalues() + if var.is_data + ] + test_feeder = fluid.DataFeeder(test_feed_var_list, place) + test_ret = test_parallel(test_exe, test_args, args, test_prog, + test_feeder) + print("Pass: %d, Test Accuracy: %s\n" % + (pass_id, [np.mean(np.array(v)) for v in test_ret])) + + print("total train time: ", time.time() - over_all_start) def print_arguments(args): @@ -328,44 +312,46 @@ def main(): if args.use_cprof: pr = cProfile.Profile() pr.enable() + model_def = __import__("models.%s" % args.model, fromlist=["models"]) - train_args = list(model_def.get_model(args)) - train_args.append(args) - # Run optimizer.minimize(avg_loss) - train_args[2].minimize(train_args[0]) - if args.memory_optimize: - fluid.memory_optimize(fluid.default_main_program()) + + train_prog = fluid.Program() + test_prog = fluid.Program() + startup_prog = fluid.Program() + + train_args = list(model_def.get_model(args, True, train_prog, startup_prog)) + test_args = list(model_def.get_model(args, False, test_prog, startup_prog)) + + all_args = [train_args, test_args, args] if args.update_method == "pserver": - train_prog, startup_prog = dist_transpile(trainer_id, args) + train_prog, startup_prog = dist_transpile(trainer_id, args, train_prog, + startup_prog) if not train_prog: raise Exception( "Must configure correct environments to run dist train.") - train_args.extend([train_prog, startup_prog]) + all_args.extend([train_prog, test_prog, startup_prog]) if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER": - train_args.extend([nccl_id_var, num_trainers, trainer_id]) - train_parallel(*train_args) - train(*train_args) + all_args.extend([nccl_id_var, num_trainers, trainer_id]) + train_parallel(*all_args) + elif os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER": + # start pserver with Executor + server_exe = fluid.Executor(fluid.CPUPlace()) + server_exe.run(startup_prog) + server_exe.run(train_prog) exit(0) # for other update methods, use default programs - train_args.append(fluid.default_main_program()) - train_args.append(fluid.default_startup_program()) + all_args.extend([train_prog, test_prog, startup_prog]) if args.update_method == "nccl2": - nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(trainer_id) - if args.gpus == 1: - # NOTE: parallel executor use profiler interanlly - if args.use_nvprof and args.device == 'GPU': - with profiler.cuda_profiler("cuda_profiler.txt", 'csv') as nvprof: - train(*train_args) - else: - train(*train_args) - else: - if args.device == "CPU": - raise Exception("Only support GPU perf with parallel exe") - train_args.extend([nccl_id_var, num_trainers, trainer_id]) - train_parallel(*train_args) + nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare( + trainer_id, startup_prog) + + if args.device == "CPU": + raise Exception("Only support GPU perf with parallel exe") + all_args.extend([nccl_id_var, num_trainers, trainer_id]) + train_parallel(*all_args) if __name__ == "__main__": diff --git a/benchmark/fluid/imagenet_reader.py b/benchmark/fluid/imagenet_reader.py new file mode 100644 index 000000000..a39485a61 --- /dev/null +++ b/benchmark/fluid/imagenet_reader.py @@ -0,0 +1,344 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import math +import random +import functools +import numpy as np +from threading import Thread +import subprocess +import time + +from Queue import Queue +import paddle +from PIL import Image, ImageEnhance + +random.seed(0) + +DATA_DIM = 224 + +THREAD = int(os.getenv("PREPROCESS_THREADS", "10")) +BUF_SIZE = 5120 + +DATA_DIR = '/mnt/ImageNet' +TRAIN_LIST = '/mnt/ImageNet/train.txt' +TEST_LIST = '/mnt/ImageNet/val.txt' + +img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1)) +img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1)) + + +def resize_short(img, target_size): + percent = float(target_size) / min(img.size[0], img.size[1]) + resized_width = int(round(img.size[0] * percent)) + resized_height = int(round(img.size[1] * percent)) + img = img.resize((resized_width, resized_height), Image.LANCZOS) + return img + + +def crop_image(img, target_size, center): + width, height = img.size + size = target_size + if center == True: + w_start = (width - size) / 2 + h_start = (height - size) / 2 + else: + w_start = random.randint(0, width - size) + h_start = random.randint(0, height - size) + w_end = w_start + size + h_end = h_start + size + img = img.crop((w_start, h_start, w_end, h_end)) + return img + + +def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]): + aspect_ratio = math.sqrt(random.uniform(*ratio)) + w = 1. * aspect_ratio + h = 1. / aspect_ratio + + bound = min((float(img.size[0]) / img.size[1]) / (w**2), + (float(img.size[1]) / img.size[0]) / (h**2)) + scale_max = min(scale[1], bound) + scale_min = min(scale[0], bound) + + target_area = img.size[0] * img.size[1] * random.uniform(scale_min, + scale_max) + target_size = math.sqrt(target_area) + w = int(target_size * w) + h = int(target_size * h) + + i = random.randint(0, img.size[0] - w) + j = random.randint(0, img.size[1] - h) + + img = img.crop((i, j, i + w, j + h)) + img = img.resize((size, size), Image.LANCZOS) + return img + + +def rotate_image(img): + angle = random.randint(-10, 10) + img = img.rotate(angle) + return img + + +def distort_color(img): + def random_brightness(img, lower=0.5, upper=1.5): + e = random.uniform(lower, upper) + return ImageEnhance.Brightness(img).enhance(e) + + def random_contrast(img, lower=0.5, upper=1.5): + e = random.uniform(lower, upper) + return ImageEnhance.Contrast(img).enhance(e) + + def random_color(img, lower=0.5, upper=1.5): + e = random.uniform(lower, upper) + return ImageEnhance.Color(img).enhance(e) + + ops = [random_brightness, random_contrast, random_color] + random.shuffle(ops) + + img = ops[0](img) + img = ops[1](img) + img = ops[2](img) + + return img + + +def process_image(sample, mode, color_jitter, rotate): + img_path = sample[0] + + img = Image.open(img_path) + if mode == 'train': + if rotate: img = rotate_image(img) + img = random_crop(img, DATA_DIM) + else: + img = resize_short(img, target_size=256) + img = crop_image(img, target_size=DATA_DIM, center=True) + if mode == 'train': + if color_jitter: + img = distort_color(img) + if random.randint(0, 1) == 1: + img = img.transpose(Image.FLIP_LEFT_RIGHT) + + if img.mode != 'RGB': + img = img.convert('RGB') + + img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255 + img -= img_mean + img /= img_std + + if mode == 'train' or mode == 'val': + return img, sample[1] + elif mode == 'test': + return [img] + + +class XmapEndSignal(): + pass + + +def xmap_readers(mapper, + reader, + process_num, + buffer_size, + order=False, + print_queue_state=True): + end = XmapEndSignal() + + # define a worker to read samples from reader to in_queue + def read_worker(reader, in_queue): + for i in reader(): + in_queue.put(i) + in_queue.put(end) + + # define a worker to read samples from reader to in_queue with order flag + def order_read_worker(reader, in_queue, file_queue): + in_order = 0 + for i in reader(): + in_queue.put((in_order, i)) + in_order += 1 + in_queue.put(end) + + # define a worker to handle samples from in_queue by mapper + # and put mapped samples into out_queue + def handle_worker(in_queue, out_queue, mapper): + sample = in_queue.get() + while not isinstance(sample, XmapEndSignal): + r = mapper(sample) + out_queue.put(r) + sample = in_queue.get() + in_queue.put(end) + out_queue.put(end) + + # define a worker to handle samples from in_queue by mapper + # and put mapped samples into out_queue by order + def order_handle_worker(in_queue, out_queue, mapper, out_order): + ins = in_queue.get() + while not isinstance(ins, XmapEndSignal): + order, sample = ins + r = mapper(sample) + while order != out_order[0]: + pass + out_queue.put(r) + out_order[0] += 1 + ins = in_queue.get() + in_queue.put(end) + out_queue.put(end) + + def xreader(): + file_queue = Queue() + in_queue = Queue(buffer_size) + out_queue = Queue(buffer_size) + out_order = [0] + # start a read worker in a thread + target = order_read_worker if order else read_worker + t = Thread(target=target, args=(reader, in_queue)) + t.daemon = True + t.start() + # start several handle_workers + target = order_handle_worker if order else handle_worker + args = (in_queue, out_queue, mapper, out_order) if order else ( + in_queue, out_queue, mapper) + workers = [] + for i in xrange(process_num): + worker = Thread(target=target, args=args) + worker.daemon = True + workers.append(worker) + for w in workers: + w.start() + + sample = out_queue.get() + start_t = time.time() + while not isinstance(sample, XmapEndSignal): + yield sample + sample = out_queue.get() + if time.time() - start_t > 3: + if print_queue_state: + print("queue sizes: ", in_queue.qsize(), out_queue.qsize()) + start_t = time.time() + finish = 1 + while finish < process_num: + sample = out_queue.get() + if isinstance(sample, XmapEndSignal): + finish += 1 + else: + yield sample + + return xreader + + +def _reader_creator(file_list, + mode, + shuffle=False, + color_jitter=False, + rotate=False, + xmap=True): + def reader(): + with open(file_list) as flist: + full_lines = [line.strip() for line in flist] + if shuffle: + random.shuffle(full_lines) + if mode == 'train': + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + trainer_count = int(os.getenv("PADDLE_TRAINERS")) + per_node_lines = len(full_lines) / trainer_count + lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) + * per_node_lines] + print( + "read images from %d, length: %d, lines length: %d, total: %d" + % (trainer_id * per_node_lines, per_node_lines, len(lines), + len(full_lines))) + else: + lines = full_lines + + for line in lines: + if mode == 'train': + img_path, label = line.split() + img_path = img_path.replace("JPEG", "jpeg") + img_path = os.path.join(DATA_DIR, "train", img_path) + yield (img_path, int(label)) + elif mode == 'val': + img_path, label = line.split() + img_path = img_path.replace("JPEG", "jpeg") + img_path = os.path.join(DATA_DIR, "val", img_path) + yield (img_path, int(label)) + elif mode == 'test': + img_path = os.path.join(DATA_DIR, line) + yield [img_path] + + mapper = functools.partial( + process_image, mode=mode, color_jitter=color_jitter, rotate=rotate) + + return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE) + + +def load_raw_image_uint8(sample): + img_arr = np.array(Image.open(sample[0])).astype('int64') + return img_arr, int(sample[1]) + + +def train_raw(file_list=TRAIN_LIST, shuffle=True): + def reader(): + with open(file_list) as flist: + full_lines = [line.strip() for line in flist] + if shuffle: + random.shuffle(full_lines) + + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + trainer_count = int(os.getenv("PADDLE_TRAINERS")) + per_node_lines = len(full_lines) / trainer_count + lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) * + per_node_lines] + print("read images from %d, length: %d, lines length: %d, total: %d" + % (trainer_id * per_node_lines, per_node_lines, len(lines), + len(full_lines))) + + for line in lines: + img_path, label = line.split() + img_path = img_path.replace("JPEG", "jpeg") + img_path = os.path.join(DATA_DIR, "train", img_path) + yield (img_path, int(label)) + + return paddle.reader.xmap_readers(load_raw_image_uint8, reader, THREAD, + BUF_SIZE) + + +def train(file_list=TRAIN_LIST, xmap=True): + return _reader_creator( + file_list, + 'train', + shuffle=True, + color_jitter=False, + rotate=False, + xmap=xmap) + + +def val(file_list=TEST_LIST, xmap=True): + return _reader_creator(file_list, 'val', shuffle=False, xmap=xmap) + + +def test(file_list=TEST_LIST): + return _reader_creator(file_list, 'test', shuffle=False) + + +if __name__ == "__main__": + c = 0 + start_t = time.time() + for d in train()(): + c += 1 + if c >= 10000: + break + spent = time.time() - start_t + print("read 10000 speed: ", 10000 / spent, spent) diff --git a/benchmark/fluid/kube_gen_job.py b/benchmark/fluid/kube_gen_job.py index dfe8b5cdd..c1f22f1bf 100644 --- a/benchmark/fluid/kube_gen_job.py +++ b/benchmark/fluid/kube_gen_job.py @@ -163,6 +163,19 @@ def gen_job(): volumes.append({"name": "dshm", "emptyDir": {"medium": "Memory"}}) volumeMounts.append({"mountPath": "/dev/shm", "name": "dshm"}) + # add ceph volumes + volumes.append({ + "name": "ceph-data", + "cephfs": { + "monitors": ["192.168.16.23:6789"], + "secretRef": { + "name": "ceph-secret" + }, + "user": "admin", + } + }) + volumeMounts.append({"mountPath": "/mnt/data", "name": "ceph-data"}) + tn["spec"]["template"]["spec"]["volumes"] = volumes tn_container["volumeMounts"] = volumeMounts diff --git a/benchmark/fluid/models/__init__.py b/benchmark/fluid/models/__init__.py index 1c3fcac8d..1b8f63c70 100644 --- a/benchmark/fluid/models/__init__.py +++ b/benchmark/fluid/models/__init__.py @@ -13,5 +13,6 @@ # limitations under the License. __all__ = [ - "machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm" + "machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm", + "resnet_with_preprocess" ] diff --git a/benchmark/fluid/models/machine_translation.py b/benchmark/fluid/models/machine_translation.py index 17f6b0382..18163c35d 100644 --- a/benchmark/fluid/models/machine_translation.py +++ b/benchmark/fluid/models/machine_translation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """seq2seq model for fluid.""" + from __future__ import absolute_import from __future__ import division from __future__ import print_function @@ -181,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor): return ndarray -def get_model(args): +def get_model(args, is_train, main_prog, startup_prog): if args.use_reader_op: raise Exception("machine_translation do not support reader op for now.") embedding_dim = 512 @@ -190,30 +191,27 @@ def get_model(args): dict_size = 30000 beam_size = 3 max_length = 250 - avg_cost, feeding_list = seq_to_seq_net( - embedding_dim, - encoder_size, - decoder_size, - dict_size, - dict_size, - False, - beam_size=beam_size, - max_length=max_length) - - # clone from default main program - inference_program = fluid.default_main_program().clone() - - optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate) - - train_batch_generator = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.wmt14.train(dict_size), buf_size=1000), - batch_size=args.batch_size * args.gpus) - test_batch_generator = paddle.batch( + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + avg_cost, feeding_list = seq_to_seq_net( + embedding_dim, + encoder_size, + decoder_size, + dict_size, + dict_size, + False, + beam_size=beam_size, + max_length=max_length) + if is_train: + optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate) + optimizer.minimize(avg_cost) + + batch_generator = paddle.batch( paddle.reader.shuffle( - paddle.dataset.wmt14.test(dict_size), buf_size=1000), - batch_size=args.batch_size) + paddle.dataset.wmt14.train(dict_size) + if is_train else paddle.dataset.wmt14.test(dict_size), + buf_size=1000), + batch_size=args.batch_size * args.gpus) - return avg_cost, inference_program, optimizer, train_batch_generator, \ - test_batch_generator, None + return avg_cost, optimizer, [], batch_generator, None diff --git a/benchmark/fluid/models/mnist.py b/benchmark/fluid/models/mnist.py index 8e740dc68..cef8657ee 100644 --- a/benchmark/fluid/models/mnist.py +++ b/benchmark/fluid/models/mnist.py @@ -65,61 +65,50 @@ def cnn_model(data): return predict -def get_model(args): - if args.use_reader_op: - filelist = [ - os.path.join(args.data_path, f) for f in os.listdir(args.data_path) - ] - data_file = fluid.layers.open_files( - filenames=filelist, - shapes=[[-1, 1, 28, 28], (-1, 1)], - lod_levels=[0, 0], - dtypes=["float32", "int64"], - thread_num=args.gpus, - pass_num=args.pass_num) - data_file = fluid.layers.double_buffer( - fluid.layers.batch( - data_file, batch_size=args.batch_size)) - images, label = fluid.layers.read_file(data_file) - else: - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - if args.device == 'CPU' and args.cpus > 1: - places = fluid.layers.get_places(args.cpus) - pd = fluid.layers.ParallelDo(places) - with pd.do(): - predict = cnn_model(pd.read_input(images)) - label = pd.read_input(label) +def get_model(args, is_train, main_prog, startup_prog): + # NOTE: mnist is small, we don't implement data sharding yet. + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + with fluid.program_guard(main_prog, startup_prog): + if args.use_reader_op: + data_file_handle = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1, 1, 28, 28], (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=1, + pass_num=1) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file_handle, batch_size=args.batch_size)) + with fluid.unique_name.guard(): + if args.use_reader_op: + input, label = fluid.layers.read_file(data_file) + else: + images = fluid.layers.data( + name='pixel', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + + predict = cnn_model(images) cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(x=cost) + # Evaluator batch_acc = fluid.layers.accuracy(input=predict, label=label) - - pd.write_output(avg_cost) - pd.write_output(batch_acc) - - avg_cost, batch_acc = pd() - avg_cost = fluid.layers.mean(avg_cost) - batch_acc = fluid.layers.mean(batch_acc) - else: - # Train program - predict = cnn_model(images) - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) - - # Evaluator - batch_acc = fluid.layers.accuracy(input=predict, label=label) - - # inference program - inference_program = fluid.default_main_program().clone() - - # Optimization - opt = fluid.optimizer.AdamOptimizer( - learning_rate=0.001, beta1=0.9, beta2=0.999) + # Optimization + if is_train: + opt = fluid.optimizer.AdamOptimizer( + learning_rate=0.001, beta1=0.9, beta2=0.999) + opt.minimize() + if args.memory_optimize: + fluid.memory_optimize(main_prog) # Reader - train_reader = paddle.batch( - paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus) - test_reader = paddle.batch( - paddle.dataset.mnist.test(), batch_size=args.batch_size) - return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc + if is_train: + reader = paddle.dataset.mnist.train() + else: + reader = paddle.dataset.mnist.test() + batched_reader = paddle.batch( + reader, batch_size=args.batch_size * args.gpus) + return avg_cost, opt, [batch_acc], batched_reader, data_file_handle diff --git a/benchmark/fluid/models/resnet.py b/benchmark/fluid/models/resnet.py index d44a9c07d..ae1baa48e 100644 --- a/benchmark/fluid/models/resnet.py +++ b/benchmark/fluid/models/resnet.py @@ -27,10 +27,17 @@ import paddle import paddle.fluid as fluid import paddle.fluid.core as core import paddle.fluid.profiler as profiler -from recordio_converter import imagenet_train, imagenet_test +# from recordio_converter import imagenet_train, imagenet_test +from imagenet_reader import train, val -def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): +def conv_bn_layer(input, + ch_out, + filter_size, + stride, + padding, + act='relu', + is_train=True): conv1 = fluid.layers.conv2d( input=input, filter_size=filter_size, @@ -39,29 +46,31 @@ def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): padding=padding, act=None, bias_attr=False) - return fluid.layers.batch_norm(input=conv1, act=act) + return fluid.layers.batch_norm(input=conv1, act=act, is_test=not is_train) -def shortcut(input, ch_out, stride): +def shortcut(input, ch_out, stride, is_train=True): ch_in = input.shape[1] # if args.data_format == 'NCHW' else input.shape[-1] if ch_in != ch_out: - return conv_bn_layer(input, ch_out, 1, stride, 0, None) + return conv_bn_layer( + input, ch_out, 1, stride, 0, None, is_train=is_train) else: return input -def basicblock(input, ch_out, stride): - short = shortcut(input, ch_out, stride) - conv1 = conv_bn_layer(input, ch_out, 3, stride, 1) - conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None) +def basicblock(input, ch_out, stride, is_train=True): + short = shortcut(input, ch_out, stride, is_train=is_train) + conv1 = conv_bn_layer(input, ch_out, 3, stride, 1, is_train=is_train) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None, is_train=is_train) return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') -def bottleneck(input, ch_out, stride): - short = shortcut(input, ch_out * 4, stride) - conv1 = conv_bn_layer(input, ch_out, 1, stride, 0) - conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1) - conv3 = conv_bn_layer(conv2, ch_out * 4, 1, 1, 0, act=None) +def bottleneck(input, ch_out, stride, is_train=True): + short = shortcut(input, ch_out * 4, stride, is_train=is_train) + conv1 = conv_bn_layer(input, ch_out, 1, stride, 0, is_train=is_train) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, is_train=is_train) + conv3 = conv_bn_layer( + conv2, ch_out * 4, 1, 1, 0, act=None, is_train=is_train) return fluid.layers.elementwise_add(x=short, y=conv3, act='relu') @@ -72,7 +81,11 @@ def layer_warp(block_func, input, ch_out, count, stride): return res_out -def resnet_imagenet(input, class_dim, depth=50, data_format='NCHW'): +def resnet_imagenet(input, + class_dim, + depth=50, + data_format='NCHW', + is_train=True): cfg = { 18: ([2, 2, 2, 1], basicblock), @@ -115,8 +128,9 @@ def resnet_cifar10(input, class_dim, depth=32, data_format='NCHW'): return out -def get_model(args): +def _model_reader_dshape_classdim(args, is_train): model = resnet_cifar10 + reader = None if args.data_set == "cifar10": class_dim = 10 if args.data_format == 'NCHW': @@ -124,8 +138,10 @@ def get_model(args): else: dshape = [32, 32, 3] model = resnet_cifar10 - train_reader = paddle.dataset.cifar.train10() - test_reader = paddle.dataset.cifar.test10() + if is_train: + reader = paddle.dataset.cifar.train10() + else: + reader = paddle.dataset.cifar.test10() elif args.data_set == "flowers": class_dim = 102 if args.data_format == 'NCHW': @@ -133,8 +149,10 @@ def get_model(args): else: dshape = [224, 224, 3] model = resnet_imagenet - train_reader = paddle.dataset.flowers.train() - test_reader = paddle.dataset.flowers.test() + if is_train: + reader = paddle.dataset.flowers.train() + else: + reader = paddle.dataset.flowers.test() elif args.data_set == "imagenet": class_dim = 1000 if args.data_format == 'NCHW': @@ -145,64 +163,89 @@ def get_model(args): if not args.data_path: raise Exception( "Must specify --data_path when training with imagenet") - train_reader = imagenet_train(args.data_path) - test_reader = imagenet_test(args.data_path) - - if args.use_reader_op: - filelist = [ - os.path.join(args.data_path, f) for f in os.listdir(args.data_path) - ] - data_file = fluid.layers.open_files( - filenames=filelist, - shapes=[[-1] + dshape, (-1, 1)], - lod_levels=[0, 0], - dtypes=["float32", "int64"], - thread_num=args.gpus, - pass_num=args.pass_num) - data_file = fluid.layers.double_buffer( - fluid.layers.batch( - data_file, batch_size=args.batch_size)) - input, label = fluid.layers.read_file(data_file) - else: - input = fluid.layers.data(name='data', shape=dshape, dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - if args.device == 'CPU' and args.cpus > 1: - places = fluid.layers.get_places(args.cpus) - pd = fluid.layers.ParallelDo(places) - with pd.do(): - predict = model(pd.read_input(input), class_dim) - label = pd.read_input(label) + if not args.use_reader_op: + if is_train: + reader = train() + else: + reader = val() + else: + if is_train: + reader = train(xmap=False) + else: + reader = val(xmap=False) + return model, reader, dshape, class_dim + + +def get_model(args, is_train, main_prog, startup_prog): + model, reader, dshape, class_dim = _model_reader_dshape_classdim(args, + is_train) + + pyreader = None + trainer_count = int(os.getenv("PADDLE_TRAINERS")) + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + if args.use_reader_op: + pyreader = fluid.layers.py_reader( + capacity=args.batch_size * args.gpus, + shapes=([-1] + dshape, (-1, 1)), + dtypes=('float32', 'int64'), + name="train_reader" if is_train else "test_reader", + use_double_buffer=True) + input, label = fluid.layers.read_file(pyreader) + else: + input = fluid.layers.data( + name='data', shape=dshape, dtype='float32') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + + predict = model(input, class_dim, is_train=is_train) cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(x=cost) - batch_acc = fluid.layers.accuracy(input=predict, label=label) - - pd.write_output(avg_cost) - pd.write_output(batch_acc) - avg_cost, batch_acc = pd() - avg_cost = fluid.layers.mean(avg_cost) - batch_acc = fluid.layers.mean(batch_acc) + batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1) + batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5) + + # configure optimize + optimizer = None + if is_train: + if args.use_lars: + lars_decay = 1.0 + else: + lars_decay = 0.0 + + total_images = 1281167 / trainer_count + + step = int(total_images / args.batch_size + 1) + epochs = [30, 60, 80, 90] + bd = [step * e for e in epochs] + base_lr = args.learning_rate + lr = [] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + optimizer = fluid.optimizer.Momentum( + learning_rate=base_lr, + #learning_rate=fluid.layers.piecewise_decay( + # boundaries=bd, values=lr), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + optimizer.minimize(avg_cost) + + if args.memory_optimize: + fluid.memory_optimize(main_prog) + + # config readers + if not args.use_reader_op: + batched_reader = paddle.batch( + reader if args.no_random else paddle.reader.shuffle( + reader, buf_size=5120), + batch_size=args.batch_size * args.gpus, + drop_last=True) else: - predict = model(input, class_dim) - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) - batch_acc = fluid.layers.accuracy(input=predict, label=label) - - inference_program = fluid.default_main_program().clone() - with fluid.program_guard(inference_program): - inference_program = fluid.io.get_inference_program( - target_vars=[batch_acc]) - - optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) - - batched_train_reader = paddle.batch( - train_reader if args.no_random else paddle.reader.shuffle( - train_reader, buf_size=5120), - batch_size=args.batch_size * args.gpus, - drop_last=True) - batched_test_reader = paddle.batch( - test_reader, batch_size=args.batch_size, drop_last=True) - - return avg_cost, inference_program, optimizer, batched_train_reader,\ - batched_test_reader, batch_acc + batched_reader = None + pyreader.decorate_paddle_reader( + paddle.batch( + reader if args.no_random else paddle.reader.shuffle( + reader, buf_size=5120), + batch_size=args.batch_size)) + + return avg_cost, optimizer, [batch_acc1, + batch_acc5], batched_reader, pyreader diff --git a/benchmark/fluid/models/resnet_with_preprocess.py b/benchmark/fluid/models/resnet_with_preprocess.py new file mode 100644 index 000000000..e8d661d84 --- /dev/null +++ b/benchmark/fluid/models/resnet_with_preprocess.py @@ -0,0 +1,268 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import functools +import numpy as np +import time +import os + +import cProfile, pstats, StringIO + +import paddle +import paddle.fluid as fluid +import paddle.fluid.core as core +import paddle.fluid.profiler as profiler +# from recordio_converter import imagenet_train, imagenet_test +from imagenet_reader import train_raw, val + + +def conv_bn_layer(input, + ch_out, + filter_size, + stride, + padding, + act='relu', + is_train=True): + conv1 = fluid.layers.conv2d( + input=input, + filter_size=filter_size, + num_filters=ch_out, + stride=stride, + padding=padding, + act=None, + bias_attr=False) + return fluid.layers.batch_norm(input=conv1, act=act, is_test=not is_train) + + +def shortcut(input, ch_out, stride, is_train=True): + ch_in = input.shape[1] # if args.data_format == 'NCHW' else input.shape[-1] + if ch_in != ch_out: + return conv_bn_layer( + input, ch_out, 1, stride, 0, None, is_train=is_train) + else: + return input + + +def basicblock(input, ch_out, stride, is_train=True): + short = shortcut(input, ch_out, stride, is_train=is_train) + conv1 = conv_bn_layer(input, ch_out, 3, stride, 1, is_train=is_train) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None, is_train=is_train) + return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') + + +def bottleneck(input, ch_out, stride, is_train=True): + short = shortcut(input, ch_out * 4, stride, is_train=is_train) + conv1 = conv_bn_layer(input, ch_out, 1, stride, 0, is_train=is_train) + conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, is_train=is_train) + conv3 = conv_bn_layer( + conv2, ch_out * 4, 1, 1, 0, act=None, is_train=is_train) + return fluid.layers.elementwise_add(x=short, y=conv3, act='relu') + + +def layer_warp(block_func, input, ch_out, count, stride): + res_out = block_func(input, ch_out, stride) + for i in range(1, count): + res_out = block_func(res_out, ch_out, 1) + return res_out + + +def resnet_imagenet(input, + class_dim, + depth=50, + data_format='NCHW', + is_train=True): + + cfg = { + 18: ([2, 2, 2, 1], basicblock), + 34: ([3, 4, 6, 3], basicblock), + 50: ([3, 4, 6, 3], bottleneck), + 101: ([3, 4, 23, 3], bottleneck), + 152: ([3, 8, 36, 3], bottleneck) + } + stages, block_func = cfg[depth] + conv1 = conv_bn_layer(input, ch_out=64, filter_size=7, stride=2, padding=3) + pool1 = fluid.layers.pool2d( + input=conv1, pool_type='avg', pool_size=3, pool_stride=2) + res1 = layer_warp(block_func, pool1, 64, stages[0], 1) + res2 = layer_warp(block_func, res1, 128, stages[1], 2) + res3 = layer_warp(block_func, res2, 256, stages[2], 2) + res4 = layer_warp(block_func, res3, 512, stages[3], 2) + pool2 = fluid.layers.pool2d( + input=res4, + pool_size=7, + pool_type='avg', + pool_stride=1, + global_pooling=True) + out = fluid.layers.fc(input=pool2, size=class_dim, act='softmax') + return out + + +def resnet_cifar10(input, class_dim, depth=32, data_format='NCHW'): + assert (depth - 2) % 6 == 0 + + n = (depth - 2) // 6 + + conv1 = conv_bn_layer( + input=input, ch_out=16, filter_size=3, stride=1, padding=1) + res1 = layer_warp(basicblock, conv1, 16, n, 1) + res2 = layer_warp(basicblock, res1, 32, n, 2) + res3 = layer_warp(basicblock, res2, 64, n, 2) + pool = fluid.layers.pool2d( + input=res3, pool_size=8, pool_type='avg', pool_stride=1) + out = fluid.layers.fc(input=pool, size=class_dim, act='softmax') + return out + + +def _model_reader_dshape_classdim(args, is_train): + model = resnet_cifar10 + reader = None + if args.data_set == "cifar10": + class_dim = 10 + if args.data_format == 'NCHW': + dshape = [3, 32, 32] + else: + dshape = [32, 32, 3] + model = resnet_cifar10 + if is_train: + reader = paddle.dataset.cifar.train10() + else: + reader = paddle.dataset.cifar.test10() + elif args.data_set == "flowers": + class_dim = 102 + if args.data_format == 'NCHW': + dshape = [3, 224, 224] + else: + dshape = [224, 224, 3] + model = resnet_imagenet + if is_train: + reader = paddle.dataset.flowers.train() + else: + reader = paddle.dataset.flowers.test() + elif args.data_set == "imagenet": + class_dim = 1000 + if args.data_format == 'NCHW': + dshape = [3, 224, 224] + else: + dshape = [224, 224, 3] + model = resnet_imagenet + if not args.data_path: + raise Exception( + "Must specify --data_path when training with imagenet") + if not args.use_reader_op: + if is_train: + reader = train_raw() + else: + reader = val() + else: + if is_train: + reader = train_raw() + else: + reader = val(xmap=False) + return model, reader, dshape, class_dim + + +def get_model(args, is_train, main_prog, startup_prog): + model, reader, dshape, class_dim = _model_reader_dshape_classdim(args, + is_train) + + pyreader = None + trainer_count = int(os.getenv("PADDLE_TRAINERS")) + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + if args.use_reader_op: + pyreader = fluid.layers.py_reader( + capacity=args.batch_size * args.gpus, + shapes=([-1] + dshape, (-1, 1)), + dtypes=('uint8', 'int64'), + name="train_reader" if is_train else "test_reader", + use_double_buffer=True) + input, label = fluid.layers.read_file(pyreader) + else: + input = fluid.layers.data( + name='data', shape=dshape, dtype='uint8') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + + # add imagenet preprocessors + random_crop = fluid.layers.random_crop(input, dshape) + casted = fluid.layers.cast(random_crop, 'float32') + # input is HWC + trans = fluid.layers.transpose(casted, [0, 3, 1, 2]) / 255.0 + img_mean = fluid.layers.tensor.assign( + np.array([0.485, 0.456, 0.406]).astype('float32').reshape((3, 1, + 1))) + img_std = fluid.layers.tensor.assign( + np.array([0.229, 0.224, 0.225]).astype('float32').reshape((3, 1, + 1))) + h1 = fluid.layers.elementwise_sub(trans, img_mean, axis=1) + h2 = fluid.layers.elementwise_div(h1, img_std, axis=1) + + # pre_out = (trans - img_mean) / img_std + + predict = model(h2, class_dim, is_train=is_train) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) + + batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1) + batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5) + + # configure optimize + optimizer = None + if is_train: + if args.use_lars: + lars_decay = 1.0 + else: + lars_decay = 0.0 + + total_images = 1281167 / trainer_count + + step = int(total_images / args.batch_size + 1) + epochs = [30, 60, 80, 90] + bd = [step * e for e in epochs] + base_lr = args.learning_rate + lr = [] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + optimizer = fluid.optimizer.Momentum( + learning_rate=base_lr, + #learning_rate=fluid.layers.piecewise_decay( + # boundaries=bd, values=lr), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + optimizer.minimize(avg_cost) + + if args.memory_optimize: + fluid.memory_optimize(main_prog) + + # config readers + if not args.use_reader_op: + batched_reader = paddle.batch( + reader if args.no_random else paddle.reader.shuffle( + reader, buf_size=5120), + batch_size=args.batch_size * args.gpus, + drop_last=True) + else: + batched_reader = None + pyreader.decorate_paddle_reader( + paddle.batch( + # reader if args.no_random else paddle.reader.shuffle( + # reader, buf_size=5120), + reader, + batch_size=args.batch_size)) + + return avg_cost, optimizer, [batch_acc1, + batch_acc5], batched_reader, pyreader diff --git a/benchmark/fluid/models/se_resnext.py b/benchmark/fluid/models/se_resnext.py new file mode 100644 index 000000000..9f887fb32 --- /dev/null +++ b/benchmark/fluid/models/se_resnext.py @@ -0,0 +1,286 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid as fluid +import math +import os +from imagenet_reader import train, val + +__all__ = [ + "SE_ResNeXt", "SE_ResNeXt50_32x4d", "SE_ResNeXt101_32x4d", + "SE_ResNeXt152_32x4d", "get_model" +] + +train_parameters = { + "input_size": [3, 224, 224], + "input_mean": [0.485, 0.456, 0.406], + "input_std": [0.229, 0.224, 0.225], + "learning_strategy": { + "name": "piecewise_decay", + "batch_size": 256, + "epochs": [30, 60, 90], + "steps": [0.1, 0.01, 0.001, 0.0001] + } +} + + +class SE_ResNeXt(): + def __init__(self, layers=50, is_train=True): + self.params = train_parameters + self.layers = layers + self.is_train = is_train + + def net(self, input, class_dim=1000): + layers = self.layers + supported_layers = [50, 101, 152] + assert layers in supported_layers, \ + "supported layers are {} but input layer is {}".format(supported_layers, layers) + if layers == 50: + cardinality = 32 + reduction_ratio = 16 + depth = [3, 4, 6, 3] + num_filters = [128, 256, 512, 1024] + + conv = self.conv_bn_layer( + input=input, + num_filters=64, + filter_size=7, + stride=2, + act='relu') + conv = fluid.layers.pool2d( + input=conv, + pool_size=3, + pool_stride=2, + pool_padding=1, + pool_type='max') + elif layers == 101: + cardinality = 32 + reduction_ratio = 16 + depth = [3, 4, 23, 3] + num_filters = [128, 256, 512, 1024] + + conv = self.conv_bn_layer( + input=input, + num_filters=64, + filter_size=7, + stride=2, + act='relu') + conv = fluid.layers.pool2d( + input=conv, + pool_size=3, + pool_stride=2, + pool_padding=1, + pool_type='max') + elif layers == 152: + cardinality = 64 + reduction_ratio = 16 + depth = [3, 8, 36, 3] + num_filters = [128, 256, 512, 1024] + + conv = self.conv_bn_layer( + input=input, + num_filters=64, + filter_size=3, + stride=2, + act='relu') + conv = self.conv_bn_layer( + input=conv, num_filters=64, filter_size=3, stride=1, act='relu') + conv = self.conv_bn_layer( + input=conv, + num_filters=128, + filter_size=3, + stride=1, + act='relu') + conv = fluid.layers.pool2d( + input=conv, pool_size=3, pool_stride=2, pool_padding=1, \ + pool_type='max') + + for block in range(len(depth)): + for i in range(depth[block]): + conv = self.bottleneck_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1, + cardinality=cardinality, + reduction_ratio=reduction_ratio) + + pool = fluid.layers.pool2d( + input=conv, pool_size=7, pool_type='avg', global_pooling=True) + drop = fluid.layers.dropout(x=pool, dropout_prob=0.5) + stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0) + out = fluid.layers.fc(input=drop, + size=class_dim, + act='softmax', + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, + stdv))) + return out + + def shortcut(self, input, ch_out, stride): + ch_in = input.shape[1] + if ch_in != ch_out or stride != 1: + filter_size = 1 + return self.conv_bn_layer(input, ch_out, filter_size, stride) + else: + return input + + def bottleneck_block(self, input, num_filters, stride, cardinality, + reduction_ratio): + conv0 = self.conv_bn_layer( + input=input, num_filters=num_filters, filter_size=1, act='relu') + conv1 = self.conv_bn_layer( + input=conv0, + num_filters=num_filters, + filter_size=3, + stride=stride, + groups=cardinality, + act='relu') + conv2 = self.conv_bn_layer( + input=conv1, num_filters=num_filters * 2, filter_size=1, act=None) + scale = self.squeeze_excitation( + input=conv2, + num_channels=num_filters * 2, + reduction_ratio=reduction_ratio) + + short = self.shortcut(input, num_filters * 2, stride) + + return fluid.layers.elementwise_add(x=short, y=scale, act='relu') + + def conv_bn_layer(self, + input, + num_filters, + filter_size, + stride=1, + groups=1, + act=None): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) / 2, + groups=groups, + act=None, + bias_attr=False) + return fluid.layers.batch_norm( + input=conv, act=act, is_test=not self.is_train) + + def squeeze_excitation(self, input, num_channels, reduction_ratio): + pool = fluid.layers.pool2d( + input=input, pool_size=0, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + squeeze = fluid.layers.fc(input=pool, + size=num_channels / reduction_ratio, + act='relu', + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform( + -stdv, stdv))) + stdv = 1.0 / math.sqrt(squeeze.shape[1] * 1.0) + excitation = fluid.layers.fc(input=squeeze, + size=num_channels, + act='sigmoid', + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform( + -stdv, stdv))) + scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0) + return scale + + +def SE_ResNeXt50_32x4d(): + model = SE_ResNeXt(layers=50) + return model + + +def SE_ResNeXt101_32x4d(): + model = SE_ResNeXt(layers=101) + return model + + +def SE_ResNeXt152_32x4d(): + model = SE_ResNeXt(layers=152) + return model + + +def get_model(args, is_train, main_prog, startup_prog): + model = SE_ResNeXt(layers=50) + batched_reader = None + pyreader = None + trainer_count = int(os.getenv("PADDLE_TRAINERS")) + dshape = train_parameters["input_size"] + + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + if args.use_reader_op: + pyreader = fluid.layers.py_reader( + capacity=10, + shapes=([-1] + dshape, (-1, 1)), + dtypes=('float32', 'int64'), + name="train_reader" if is_train else "test_reader", + use_double_buffer=True) + input, label = fluid.layers.read_file(pyreader) + else: + input = fluid.layers.data( + name='data', shape=dshape, dtype='float32') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + + out = model.net(input=input) + cost = fluid.layers.cross_entropy(input=out, label=label) + avg_cost = fluid.layers.mean(x=cost) + acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1) + acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5) + + optimizer = None + if is_train: + if args.use_lars: + lars_decay = 1.0 + else: + lars_decay = 0.0 + + total_images = 1281167 / trainer_count + + step = int(total_images / args.batch_size + 1) + epochs = [40, 80, 100] + bd = [step * e for e in epochs] + base_lr = args.learning_rate + lr = [] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + optimizer = fluid.optimizer.Momentum( + # learning_rate=base_lr, + learning_rate=fluid.layers.piecewise_decay( + boundaries=bd, values=lr), + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4), + LARS_weight_decay=lars_decay) + optimizer.minimize(avg_cost) + + if args.memory_optimize: + fluid.memory_optimize(main_prog) + + # config readers + if is_train: + reader = train() + else: + reader = val() + + if not args.use_reader_op: + batched_reader = paddle.batch( + reader, batch_size=args.batch_size * args.gpus, drop_last=True) + else: + pyreader.decorate_paddle_reader( + paddle.batch( + reader, batch_size=args.batch_size)) + + return avg_cost, optimizer, [acc_top1, acc_top5], batched_reader, pyreader diff --git a/benchmark/fluid/models/stacked_dynamic_lstm.py b/benchmark/fluid/models/stacked_dynamic_lstm.py index 3231542a1..f23bb59de 100644 --- a/benchmark/fluid/models/stacked_dynamic_lstm.py +++ b/benchmark/fluid/models/stacked_dynamic_lstm.py @@ -26,7 +26,6 @@ import numpy import paddle import paddle.dataset.imdb as imdb import paddle.fluid as fluid -import paddle.batch as batch import paddle.fluid.profiler as profiler word_dict = imdb.word_dict() @@ -43,19 +42,7 @@ def crop_sentence(reader, crop_size): return __impl__ -def get_model(args): - if args.use_reader_op: - raise Exception( - "stacked_dynamic_lstm do not support reader op for now.") - lstm_size = 512 - emb_dim = 512 - crop_size = 1500 - - data = fluid.layers.data( - name="words", shape=[1], lod_level=1, dtype='int64') - sentence = fluid.layers.embedding( - input=data, size=[len(word_dict), emb_dim]) - +def lstm_net(sentence, lstm_size): sentence = fluid.layers.fc(input=sentence, size=lstm_size, act='tanh') rnn = fluid.layers.DynamicRNN() @@ -97,31 +84,47 @@ def get_model(args): last = fluid.layers.sequence_pool(rnn(), 'last') logit = fluid.layers.fc(input=last, size=2, act='softmax') - loss = fluid.layers.cross_entropy( - input=logit, - label=fluid.layers.data( - name='label', shape=[1], dtype='int64')) - loss = fluid.layers.mean(x=loss) + return logit - # add acc - batch_size_tensor = fluid.layers.create_tensor(dtype='int64') - batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \ - shape=[1], dtype='int64'), total=batch_size_tensor) - inference_program = fluid.default_main_program().clone() - with fluid.program_guard(inference_program): - inference_program = fluid.io.get_inference_program( - target_vars=[batch_acc, batch_size_tensor]) - - adam = fluid.optimizer.Adam() +def get_model(args, is_train, main_prog, startup_prog): + if args.use_reader_op: + raise Exception( + "stacked_dynamic_lstm do not support reader op for now.") + lstm_size = 512 + emb_dim = 512 + crop_size = 1500 - train_reader = batch( + with fluid.program_guard(main_prog, startup_prog): + with fluid.unique_name.guard(): + data = fluid.layers.data( + name="words", shape=[1], lod_level=1, dtype='int64') + sentence = fluid.layers.embedding( + input=data, size=[len(word_dict), emb_dim]) + logit = lstm_net(sentence, lstm_size) + loss = fluid.layers.cross_entropy( + input=logit, + label=fluid.layers.data( + name='label', shape=[1], dtype='int64')) + loss = fluid.layers.mean(x=loss) + + # add acc + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \ + shape=[1], dtype='int64'), total=batch_size_tensor) + + if is_train: + adam = fluid.optimizer.Adam() + adam.minimize(loss) + + if is_train: + reader = crop_sentence(imdb.train(word_dict), crop_size) + else: + reader = crop_sentence(imdb.test(word_dict), crop_size) + + batched_reader = paddle.batch( paddle.reader.shuffle( - crop_sentence(imdb.train(word_dict), crop_size), buf_size=25000), + reader, buf_size=25000), batch_size=args.batch_size * args.gpus) - test_reader = batch( - paddle.reader.shuffle( - crop_sentence(imdb.test(word_dict), crop_size), buf_size=25000), - batch_size=args.batch_size) - return loss, inference_program, adam, train_reader, test_reader, batch_acc + return loss, adam, [batch_acc], batched_reader, None diff --git a/benchmark/fluid/models/vgg.py b/benchmark/fluid/models/vgg.py index 932601302..cf9708d50 100644 --- a/benchmark/fluid/models/vgg.py +++ b/benchmark/fluid/models/vgg.py @@ -25,7 +25,7 @@ import functools import os -def vgg16_bn_drop(input): +def vgg16_bn_drop(input, is_train=True): def conv_block(input, num_filter, groups, dropouts): return fluid.nets.img_conv_group( input=input, @@ -46,13 +46,13 @@ def vgg16_bn_drop(input): drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5) fc1 = fluid.layers.fc(input=drop, size=512, act=None) - bn = fluid.layers.batch_norm(input=fc1, act='relu') + bn = fluid.layers.batch_norm(input=fc1, act='relu', is_test=not is_train) drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5) fc2 = fluid.layers.fc(input=drop2, size=512, act=None) return fc2 -def get_model(args): +def get_model(args, is_train, main_prog, startup_prog): if args.data_set == "cifar10": classdim = 10 if args.data_format == 'NCHW': @@ -65,57 +65,56 @@ def get_model(args): data_shape = [3, 224, 224] else: data_shape = [224, 224, 3] + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + with fluid.program_guard(main_prog, startup_prog): + if args.use_reader_op: + data_file_handle = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1] + data_shape, (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=1, + pass_num=1) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file_handle, batch_size=args.batch_size)) + with fluid.unique_name.guard(): + if args.use_reader_op: + images, label = fluid.layers.read_file(data_file) + else: + images = fluid.layers.data( + name='data', shape=data_shape, dtype='float32') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + # Train program + net = vgg16_bn_drop(images, is_train=is_train) + predict = fluid.layers.fc(input=net, size=classdim, act='softmax') + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.mean(x=cost) - if args.use_reader_op: - filelist = [ - os.path.join(args.data_path, f) for f in os.listdir(args.data_path) - ] - data_file = fluid.layers.open_files( - filenames=filelist, - shapes=[[-1] + data_shape, (-1, 1)], - lod_levels=[0, 0], - dtypes=["float32", "int64"], - thread_num=args.gpus, - pass_num=args.pass_num) - data_file = fluid.layers.double_buffer( - fluid.layers.batch( - data_file, batch_size=args.batch_size)) - images, label = fluid.layers.read_file(data_file) - else: - images = fluid.layers.data( - name='data', shape=data_shape, dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - # Train program - net = vgg16_bn_drop(images) - predict = fluid.layers.fc(input=net, size=classdim, act='softmax') - cost = fluid.layers.cross_entropy(input=predict, label=label) - avg_cost = fluid.layers.mean(x=cost) - - # Evaluator - batch_size_tensor = fluid.layers.create_tensor(dtype='int64') - batch_acc = fluid.layers.accuracy( - input=predict, label=label, total=batch_size_tensor) - - # inference program - inference_program = fluid.default_main_program().clone() - with fluid.program_guard(inference_program): - inference_program = fluid.io.get_inference_program( - target_vars=[batch_acc, batch_size_tensor]) - - # Optimization - optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate) + # Evaluator + batch_size_tensor = fluid.layers.create_tensor(dtype='int64') + batch_acc = fluid.layers.accuracy( + input=predict, label=label, total=batch_size_tensor) + # Optimization + if is_train: + optimizer = fluid.optimizer.Adam( + learning_rate=args.learning_rate) + optimizer.minimize(avg_cost) # data reader - train_reader = paddle.batch( + if is_train: + reader = paddle.dataset.cifar.train10() \ + if args.data_set == 'cifar10' else paddle.dataset.flowers.train() + else: + reader = paddle.dataset.cifar.test10() \ + if args.data_set == 'cifar10' else paddle.dataset.flowers.test() + + batched_reader = paddle.batch( paddle.reader.shuffle( - paddle.dataset.cifar.train10() - if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), - buf_size=5120), + reader, buf_size=5120), batch_size=args.batch_size * args.gpus) - test_reader = paddle.batch( - paddle.dataset.cifar.test10() - if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), - batch_size=args.batch_size) - return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc + return avg_cost, optimizer, [batch_acc], batched_reader, data_file_handle diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index c2694144d..ae5f30e43 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -66,7 +66,7 @@ paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'pla paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspilerConfig.__init__ -paddle.fluid.ParallelExecutor.__init__ ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id'], varargs=None, keywords='kwargs', defaults=(None, None, None, None, None, 1, 0)) +paddle.fluid.ParallelExecutor.__init__ ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id', 'scope'], varargs=None, keywords='kwargs', defaults=(None, None, None, None, None, 1, 0, None)) paddle.fluid.ParallelExecutor.run ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True)) paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ExecutionStrategy) -> None paddle.fluid.BuildStrategy.GradientScaleStrategy.__init__ __init__(self: paddle.fluid.core.GradientScaleStrategy, arg0: int) -> None diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index cc46c88fd..115abb98d 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -100,14 +100,13 @@ struct NCCLContextMap { return; } std::unique_ptr comms(new ncclComm_t[order_.size()]); - // if pass nccl_id here, can assume we are doing multi node training - if (nccl_id == nullptr) { + // if num_trainers == 1, should create a new nccl id for local comms. + if (num_trainers == 1) { std::lock_guard guard(NCCLGroupGuard::NCCLMutex()); PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( comms.get(), static_cast(order_.size()), order_.data())); } else { - PADDLE_ENFORCE_GT(num_trainers, 1); - // TODO(wuyi): need to ensure each node have same number of GPUs + PADDLE_ENFORCE_NOT_NULL(nccl_id); { int nranks = num_trainers * order_.size(); NCCLGroupGuard gurad; diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index a7765c959..4790e0f61 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -43,8 +43,9 @@ class ParallelExecutor(object): num_trainers(int): If greater than 1, NCCL will be initialized with multiple rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. Default 1. - trainer_id(int: Must use together with num_trainers. trainer_id is the + trainer_id(int): Must use together with num_trainers. trainer_id is the "rank" of current node starts from 0. Default 0. + scope(Scope): scope to run with, default use fluid.global_scope(). Returns: ParallelExecutor: The initialized ParallelExecutor object. @@ -73,6 +74,7 @@ class ParallelExecutor(object): build_strategy=None, num_trainers=1, trainer_id=0, + scope=None, **kwargs): if len(kwargs) != 0: err_msg = "" @@ -131,7 +133,8 @@ class ParallelExecutor(object): main = main_program main = main if main else framework.default_main_program() - scope = executor.global_scope() + if scope == None: + scope = executor.global_scope() # FIXME(Yancey1989): it's a temporary approach to determinate the distribute # train program, call self.bcast_param() at the end of each mini-batch. self.is_dist = True if "recv" in [ -- GitLab