diff --git a/python/paddle/fluid/incubate/fleet/collective/__init__.py b/python/paddle/fluid/incubate/fleet/collective/__init__.py index 26b8e2c3b12a0a2e888377ddad24bbeae7c6f3e1..afb0e72eb523d8b05aba2fd951b511e202926338 100644 --- a/python/paddle/fluid/incubate/fleet/collective/__init__.py +++ b/python/paddle/fluid/incubate/fleet/collective/__init__.py @@ -16,6 +16,10 @@ import logging import paddle.fluid as fluid import paddle.fluid.io as io import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler +from paddle.fluid.executor import Executor +from paddle.fluid.parallel_executor import ParallelExecutor +from paddle.fluid.compiler import CompiledProgram +from paddle.fluid.framework import Program from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Mode @@ -80,11 +84,47 @@ class Collective(Fleet): target_vars=None, main_program=None, export_for_deployment=True): + """ + Prune the given `main_program` to build a new program especially for + inference, and then save it and all related parameters to given + `dirname` by the `executor`. + """ + assert isinstance(executor, Executor), \ + "In fleet.save_inference_model() function, executor must be as" \ + " Executor type." + + if main_program is None: + main_program = self._origin_program + assert isinstance(main_program, Program), \ + "In fleet.save_inference_model() function, main_program " \ + "must be as Program type." + io.save_inference_model(dirname, feeded_var_names, target_vars, executor, main_program, None, None, export_for_deployment) def save_persistables(self, executor, dirname, main_program=None): + """ + This function filters out all variables with `persistable==True` from + the give `main_program` and then saves these variables to the folder + `dirname` or file `filename`. + + The `dirname` is used to specify the folder where persistable variables + are going to be saved. If you would like to save variables in separate + files, set `filename` None; if you would like to save all variables in a + single file, use `filename` to specify the file name. + """ + assert isinstance(executor, Executor), \ + "In fleet.save_inference_model() function, executor must be as" \ + " Executor type." + + if main_program is None: + main_program = self._origin_program + + assert isinstance(main_program, Program), \ + "In fleet.save_inference_model() function, main_program " \ + "must be as Program type." + io.save_persistables(executor, dirname, main_program, None) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 338b6e8b03ea3458e9edf827895d06ae09eee155..9a77edd0dc517d6219e7f398ab927dd3e70f31df 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -191,6 +191,36 @@ class TestDistRunnerBase(object): else: sys.stdout.buffer.write(pickle.dumps(out_losses)) + if args.save_model: + model_save_dir = "/tmp" + if fleet.worker_index() == 0: + model_save_dir_fluid = os.path.join(model_save_dir, + "fluid_persistables") + model_save_dir_fleet = os.path.join(model_save_dir, + "fleet_persistables") + infer_save_dir_fluid = os.path.join(model_save_dir, + "fluid_infer") + infer_save_dir_fleet = os.path.join(model_save_dir, + "fleet_infer") + else: + model_save_dir_fluid = os.path.join(model_save_dir, + "fluid_persistables_2") + model_save_dir_fleet = os.path.join(model_save_dir, + "fleet_persistables_2") + infer_save_dir_fluid = os.path.join(model_save_dir, + "fluid_infer_2") + infer_save_dir_fleet = os.path.join(model_save_dir, + "fleet_infer_2") + fluid.io.save_persistables(exe, model_save_dir_fluid, + fleet._origin_program) + fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet) + feeded_var_names = [var.name for var in feed_var_list] + fluid.io.save_inference_model(infer_save_dir_fluid, + feeded_var_names, [avg_cost], exe, + fleet._origin_program) + fleet.save_inference_model(exe, infer_save_dir_fleet, + feeded_var_names, [avg_cost]) + def run_trainer(self, args): self.lr = args.lr if args.nccl2_reduce_layer_local_run: @@ -438,6 +468,7 @@ def runtime_main(test_class): parser.add_argument('--use_reduce', action='store_true') parser.add_argument('--dc_asgd', action='store_true') parser.add_argument('--hogwild', action='store_true') + parser.add_argument('--save_model', action='store_true') parser.add_argument( '--use_reader_alloc', action='store_true', required=False) parser.add_argument('--batch_size', required=False, type=int, default=2) @@ -513,6 +544,7 @@ class TestDistBase(unittest.TestCase): self._use_local_sgd = False self._ut4grad_allreduce = False self._use_hallreduce = False + self._save_model = False self._setup_config() global DIST_UT_PORT @@ -763,6 +795,8 @@ class TestDistBase(unittest.TestCase): tr_cmd += " --use_reduce" if self._use_reader_alloc: tr_cmd += " --use_reader_alloc" + if self._save_model: + tr_cmd += " --save_model" if self.__use_cuda: tr_cmd += " --use_cuda" env.update({ diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist_fleet_save.py b/python/paddle/fluid/tests/unittests/test_dist_mnist_fleet_save.py new file mode 100644 index 0000000000000000000000000000000000000000..7dac11535629379639e86f2a4d2583fb703d5bfb --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist_fleet_save.py @@ -0,0 +1,103 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import shutil +import os +import unittest +from test_dist_base import TestDistBase + + +class TestDistMnistFleetSave(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._use_reader_alloc = False + self._nccl2_mode = True + self._gpu_fleet_api = True + self._save_model = True + + def _rm_temp_files(self, dirname): + fluid_model_path = os.path.join(dirname, 'fluid_persistables') + fleet_model_path = os.path.join(dirname, 'fleet_persistables') + fluid_infer_path = os.path.join(dirname, 'fluid_infer') + fleet_infer_path = os.path.join(dirname, 'fleet_infer') + fluid_model_path_2 = os.path.join(dirname, 'fluid_persistables_2') + fleet_model_path_2 = os.path.join(dirname, 'fleet_persistables_2') + fluid_infer_path_2 = os.path.join(dirname, 'fluid_infer_2') + fleet_infer_path_2 = os.path.join(dirname, 'fleet_infer_2') + + shutil.rmtree(fluid_model_path) + shutil.rmtree(fleet_model_path) + shutil.rmtree(fluid_infer_path) + shutil.rmtree(fleet_infer_path) + shutil.rmtree(fluid_model_path_2) + shutil.rmtree(fleet_model_path_2) + shutil.rmtree(fluid_infer_path_2) + shutil.rmtree(fleet_infer_path_2) + + def _test_saved_files(self, dirname): + fluid_model_path = os.path.join(dirname, 'fluid_persistables') + fluid_persistables = sorted(os.listdir(fluid_model_path)) + fleet_model_path = os.path.join(dirname, 'fleet_persistables') + fleet_persistables = sorted(os.listdir(fleet_model_path)) + fluid_infer_path = os.path.join(dirname, 'fluid_infer') + fluid_infer_files = sorted(os.listdir(fluid_infer_path)) + fleet_infer_path = os.path.join(dirname, 'fleet_infer') + fleet_infer_files = sorted(os.listdir(fleet_infer_path)) + + if len(fluid_persistables) != len(fleet_persistables): + self._rm_temp_files(dirname) + raise ValueError("Test Failed.") + for i in range(len(fluid_persistables)): + if fluid_persistables[i] != fleet_persistables[i]: + self._rm_temp_files(dirname) + raise ValueError("Test Failed.") + + if len(fluid_infer_files) != len(fleet_infer_files): + self._rm_temp_files(dirname) + raise ValueError("Test Failed.") + for i in range(len(fluid_infer_files)): + if fluid_infer_files[i] != fleet_infer_files[i]: + self._rm_temp_files(dirname) + raise ValueError("Test Failed.") + self._rm_temp_files(dirname) + return True + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}, + log_name=""): + required_envs = self._get_required_envs(check_error_log, need_envs) + + tr0_losses, tr1_losses = self._run_cluster_nccl2( + model_file, + required_envs, + False, + check_error_log, + log_name=log_name) + + dirname = '/tmp' + self._test_saved_files(dirname) + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place("dist_mnist.py", delta=1e-5) + + +if __name__ == "__main__": + unittest.main()