未验证 提交 53148e06 编写于 作者: L lilong12 提交者: GitHub

modify the implementation of save_persistables and save_inference_model for...

modify the implementation of save_persistables and save_inference_model for fleet collective mode (#20802)

* modify the implementation of  save_persistables and save_inference_model functions for fleet collective, test=develop

* add ut, test=develop
上级 bd8b0eba
...@@ -16,6 +16,10 @@ import logging ...@@ -16,6 +16,10 @@ import logging
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.io as io import paddle.fluid.io as io
import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler import paddle.fluid.transpiler.distribute_transpiler as dist_transpiler
from paddle.fluid.executor import Executor
from paddle.fluid.parallel_executor import ParallelExecutor
from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.framework import Program
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import Mode
...@@ -80,11 +84,47 @@ class Collective(Fleet): ...@@ -80,11 +84,47 @@ class Collective(Fleet):
target_vars=None, target_vars=None,
main_program=None, main_program=None,
export_for_deployment=True): export_for_deployment=True):
"""
Prune the given `main_program` to build a new program especially for
inference, and then save it and all related parameters to given
`dirname` by the `executor`.
"""
assert isinstance(executor, Executor), \
"In fleet.save_inference_model() function, executor must be as" \
" Executor type."
if main_program is None:
main_program = self._origin_program
assert isinstance(main_program, Program), \
"In fleet.save_inference_model() function, main_program " \
"must be as Program type."
io.save_inference_model(dirname, feeded_var_names, target_vars, io.save_inference_model(dirname, feeded_var_names, target_vars,
executor, main_program, None, None, executor, main_program, None, None,
export_for_deployment) export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None): def save_persistables(self, executor, dirname, main_program=None):
"""
This function filters out all variables with `persistable==True` from
the give `main_program` and then saves these variables to the folder
`dirname` or file `filename`.
The `dirname` is used to specify the folder where persistable variables
are going to be saved. If you would like to save variables in separate
files, set `filename` None; if you would like to save all variables in a
single file, use `filename` to specify the file name.
"""
assert isinstance(executor, Executor), \
"In fleet.save_inference_model() function, executor must be as" \
" Executor type."
if main_program is None:
main_program = self._origin_program
assert isinstance(main_program, Program), \
"In fleet.save_inference_model() function, main_program " \
"must be as Program type."
io.save_persistables(executor, dirname, main_program, None) io.save_persistables(executor, dirname, main_program, None)
......
...@@ -191,6 +191,36 @@ class TestDistRunnerBase(object): ...@@ -191,6 +191,36 @@ class TestDistRunnerBase(object):
else: else:
sys.stdout.buffer.write(pickle.dumps(out_losses)) sys.stdout.buffer.write(pickle.dumps(out_losses))
if args.save_model:
model_save_dir = "/tmp"
if fleet.worker_index() == 0:
model_save_dir_fluid = os.path.join(model_save_dir,
"fluid_persistables")
model_save_dir_fleet = os.path.join(model_save_dir,
"fleet_persistables")
infer_save_dir_fluid = os.path.join(model_save_dir,
"fluid_infer")
infer_save_dir_fleet = os.path.join(model_save_dir,
"fleet_infer")
else:
model_save_dir_fluid = os.path.join(model_save_dir,
"fluid_persistables_2")
model_save_dir_fleet = os.path.join(model_save_dir,
"fleet_persistables_2")
infer_save_dir_fluid = os.path.join(model_save_dir,
"fluid_infer_2")
infer_save_dir_fleet = os.path.join(model_save_dir,
"fleet_infer_2")
fluid.io.save_persistables(exe, model_save_dir_fluid,
fleet._origin_program)
fleet.save_persistables(executor=exe, dirname=model_save_dir_fleet)
feeded_var_names = [var.name for var in feed_var_list]
fluid.io.save_inference_model(infer_save_dir_fluid,
feeded_var_names, [avg_cost], exe,
fleet._origin_program)
fleet.save_inference_model(exe, infer_save_dir_fleet,
feeded_var_names, [avg_cost])
def run_trainer(self, args): def run_trainer(self, args):
self.lr = args.lr self.lr = args.lr
if args.nccl2_reduce_layer_local_run: if args.nccl2_reduce_layer_local_run:
...@@ -438,6 +468,7 @@ def runtime_main(test_class): ...@@ -438,6 +468,7 @@ def runtime_main(test_class):
parser.add_argument('--use_reduce', action='store_true') parser.add_argument('--use_reduce', action='store_true')
parser.add_argument('--dc_asgd', action='store_true') parser.add_argument('--dc_asgd', action='store_true')
parser.add_argument('--hogwild', action='store_true') parser.add_argument('--hogwild', action='store_true')
parser.add_argument('--save_model', action='store_true')
parser.add_argument( parser.add_argument(
'--use_reader_alloc', action='store_true', required=False) '--use_reader_alloc', action='store_true', required=False)
parser.add_argument('--batch_size', required=False, type=int, default=2) parser.add_argument('--batch_size', required=False, type=int, default=2)
...@@ -513,6 +544,7 @@ class TestDistBase(unittest.TestCase): ...@@ -513,6 +544,7 @@ class TestDistBase(unittest.TestCase):
self._use_local_sgd = False self._use_local_sgd = False
self._ut4grad_allreduce = False self._ut4grad_allreduce = False
self._use_hallreduce = False self._use_hallreduce = False
self._save_model = False
self._setup_config() self._setup_config()
global DIST_UT_PORT global DIST_UT_PORT
...@@ -763,6 +795,8 @@ class TestDistBase(unittest.TestCase): ...@@ -763,6 +795,8 @@ class TestDistBase(unittest.TestCase):
tr_cmd += " --use_reduce" tr_cmd += " --use_reduce"
if self._use_reader_alloc: if self._use_reader_alloc:
tr_cmd += " --use_reader_alloc" tr_cmd += " --use_reader_alloc"
if self._save_model:
tr_cmd += " --save_model"
if self.__use_cuda: if self.__use_cuda:
tr_cmd += " --use_cuda" tr_cmd += " --use_cuda"
env.update({ env.update({
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import shutil
import os
import unittest
from test_dist_base import TestDistBase
class TestDistMnistFleetSave(TestDistBase):
def _setup_config(self):
self._sync_mode = True
self._use_reduce = False
self._use_reader_alloc = False
self._nccl2_mode = True
self._gpu_fleet_api = True
self._save_model = True
def _rm_temp_files(self, dirname):
fluid_model_path = os.path.join(dirname, 'fluid_persistables')
fleet_model_path = os.path.join(dirname, 'fleet_persistables')
fluid_infer_path = os.path.join(dirname, 'fluid_infer')
fleet_infer_path = os.path.join(dirname, 'fleet_infer')
fluid_model_path_2 = os.path.join(dirname, 'fluid_persistables_2')
fleet_model_path_2 = os.path.join(dirname, 'fleet_persistables_2')
fluid_infer_path_2 = os.path.join(dirname, 'fluid_infer_2')
fleet_infer_path_2 = os.path.join(dirname, 'fleet_infer_2')
shutil.rmtree(fluid_model_path)
shutil.rmtree(fleet_model_path)
shutil.rmtree(fluid_infer_path)
shutil.rmtree(fleet_infer_path)
shutil.rmtree(fluid_model_path_2)
shutil.rmtree(fleet_model_path_2)
shutil.rmtree(fluid_infer_path_2)
shutil.rmtree(fleet_infer_path_2)
def _test_saved_files(self, dirname):
fluid_model_path = os.path.join(dirname, 'fluid_persistables')
fluid_persistables = sorted(os.listdir(fluid_model_path))
fleet_model_path = os.path.join(dirname, 'fleet_persistables')
fleet_persistables = sorted(os.listdir(fleet_model_path))
fluid_infer_path = os.path.join(dirname, 'fluid_infer')
fluid_infer_files = sorted(os.listdir(fluid_infer_path))
fleet_infer_path = os.path.join(dirname, 'fleet_infer')
fleet_infer_files = sorted(os.listdir(fleet_infer_path))
if len(fluid_persistables) != len(fleet_persistables):
self._rm_temp_files(dirname)
raise ValueError("Test Failed.")
for i in range(len(fluid_persistables)):
if fluid_persistables[i] != fleet_persistables[i]:
self._rm_temp_files(dirname)
raise ValueError("Test Failed.")
if len(fluid_infer_files) != len(fleet_infer_files):
self._rm_temp_files(dirname)
raise ValueError("Test Failed.")
for i in range(len(fluid_infer_files)):
if fluid_infer_files[i] != fleet_infer_files[i]:
self._rm_temp_files(dirname)
raise ValueError("Test Failed.")
self._rm_temp_files(dirname)
return True
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={},
log_name=""):
required_envs = self._get_required_envs(check_error_log, need_envs)
tr0_losses, tr1_losses = self._run_cluster_nccl2(
model_file,
required_envs,
False,
check_error_log,
log_name=log_name)
dirname = '/tmp'
self._test_saved_files(dirname)
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.check_with_place("dist_mnist.py", delta=1e-5)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册