From 57d434df5d0306452f2d27a66d810993c4e0eea7 Mon Sep 17 00:00:00 2001 From: 123malin Date: Thu, 20 Aug 2020 14:40:52 +0800 Subject: [PATCH] add save/load for parameter server (#26235) * add save/load for parameter server --- python/paddle/distributed/fleet/__init__.py | 2 + .../distributed/fleet/base/fleet_base.py | 38 ++- .../fleet/runtime/parameter_server_runtime.py | 314 +++++++++++++++++- .../distributed/fleet/runtime/runtime_base.py | 6 + .../fluid/tests/unittests/CMakeLists.txt | 4 + .../fluid/tests/unittests/dist_fleet_ctr.py | 15 +- .../dist_fleet_sparse_embedding_ctr.py | 10 +- .../tests/unittests/test_dist_fleet_base.py | 25 +- .../tests/unittests/test_dist_fleet_ctr.py | 4 +- .../fluid/tests/unittests/test_fleet_base.py | 22 +- .../tests/unittests/test_fleet_base_2.py | 102 ++++++ .../tests/unittests/test_fleet_base_3.py | 52 +++ .../tests/unittests/test_fleet_runtime.py | 23 ++ 13 files changed, 562 insertions(+), 55 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_base_2.py create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_base_3.py diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index e291fdf4ed9..b080fb17553 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -48,4 +48,6 @@ init_server = fleet.init_server run_server = fleet.run_server stop_worker = fleet.stop_worker distributed_optimizer = fleet.distributed_optimizer +save_inference_model = fleet.save_inference_model +save_persistables = fleet.save_persistables minimize = fleet.minimize diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index 695fd01909c..8093ad504c1 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -19,10 +19,26 @@ from .distributed_strategy import DistributedStrategy from .meta_optimizer_factory import MetaOptimizerFactory from .runtime_factory import RuntimeFactory from .util_factory import UtilFactory +from paddle.fluid.wrapped_decorator import wrap_decorator __all__ = ['Fleet'] +def _inited_runtime_handler_(func): + def __impl__(*args, **kwargs): + cls = args[0] + + if cls._runtime_handle is None: + raise ValueError("Fleet can not find suitable runtime handler") + + return func(*args, **kwargs) + + return __impl__ + + +inited_runtime_handler = wrap_decorator(_inited_runtime_handler_) + + class Fleet(object): """ Unified API for distributed training of PaddlePaddle @@ -182,34 +198,48 @@ class Fleet(object): """ self._role_maker.barrier_worker() + @inited_runtime_handler def init_worker(self): """ init worker """ - assert self._runtime_handle is not None self._runtime_handle._init_worker() + @inited_runtime_handler def init_server(self, *args, **kwargs): """ init server """ - assert self._runtime_handle is not None self._runtime_handle._init_server(*args, **kwargs) + @inited_runtime_handler def run_server(self): """ run server """ - assert self._runtime_handle is not None self._runtime_handle._run_server() + @inited_runtime_handler def stop_worker(self): """ stop worker """ - assert self._runtime_handle is not None self._runtime_handle._stop_worker() + def save_inference_model(self, + executor, + dirname, + feeded_var_names, + target_vars, + main_program=None, + export_for_deployment=True): + self._runtime_handle._save_inference_model( + executor, dirname, feeded_var_names, target_vars, main_program, + export_for_deployment) + + def save_persistables(self, executor, dirname, main_program=None): + self._runtime_handle._save_persistables(executor, dirname, main_program) + def distributed_optimizer(self, optimizer, strategy=None): """ distirbuted_optimizer diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 813649edbcb..c731ed08893 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -13,11 +13,14 @@ # limitations under the License. import os -import logging import warnings import paddle.fluid as fluid from paddle.fluid import core +from paddle.fluid.framework import Program +from paddle.fluid.compiler import CompiledProgram +from paddle.fluid.executor import Executor +from paddle.fluid.parallel_executor import ParallelExecutor from .runtime_base import RuntimeBase @@ -241,3 +244,312 @@ class ParameterServerRuntime(RuntimeBase): self._communicator.stop() executor = fluid.Executor(fluid.CPUPlace()) executor.close() + + def _get_optimizer_status(self, op, param_name): + supported_opts = [ + "sgd", "adam", "adagrad", "adamax", "momentum", "lars_momentum", + "rmsprop", "decayed_adagrad", "ftrl" + ] + + reshaped_val_map = {} + reshaped_val_map["sgd"] = [] + reshaped_val_map["adam"] = ["moment1_0", "moment2_0"] + reshaped_val_map["adagrad"] = ["moment_0"] + reshaped_val_map["adamax"] = ["moment_0", "inf_norm_0"] + reshaped_val_map["momentum"] = ["velocity_0"] + reshaped_val_map["lars_momentum"] = ["velocity_0"] + reshaped_val_map[ + "rmsprop"] = ["momentum_0", "mean_square_0", "mean_grad_0"] + reshaped_val_map["decayed_adagrad"] = ["moment_0"] + reshaped_val_map["ftrl"] = ["squared_0", "linear_0"] + + orishaped_val_map = {} + orishaped_val_map["adam"] = ["beta1_pow_acc_0", "beta2_pow_acc_0"] + orishaped_val_map["adamax"] = ["beta1_pow_acc_0"] + + if op not in supported_opts: + raise ValueError( + "fleet can not support optimizer: {}, only this can be supported: {}". + format(op, supported_opts)) + + reshaped_names = [ + param_name + "_" + val for val in reshaped_val_map[op] + ] + + if op not in orishaped_val_map: + origin_names = [] + else: + origin_names = [ + param_name + "_" + val for val in orishaped_val_map[op] + ] + return reshaped_names, origin_names + + def _get_optimizer_op(self, param_name): + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_optimize_ops + + opts = _get_optimize_ops(self.origin_main_program) + for op in opts: + if "Param" in op.input_names and \ + "LearningRate" in op.input_names and op.input("Param")[0] == param_name: + return op + + def _save_dense_params(self, executor, dirname, context, main_program): + self._communicator.recv() + + prog = Program() + block = prog.global_block() + local_vars = [] + + for name, var_ctx in context.items(): + if len(var_ctx.origin_varnames()) != 1: + raise ValueError("Dense can not support split now.") + + varname = var_ctx.origin_varnames()[0] + local_vars.append(varname) + + optimizer = self._get_optimizer_op(varname) + reshaped_varnames, origin_varnames = self._get_optimizer_status( + optimizer.type, varname) + + for var_name in [varname] + reshaped_varnames + origin_varnames: + var = self.origin_main_program.global_block().vars[var_name] + block.append_op( + type='recv_save', + attrs={ + "trainer_id": self.role_maker.worker_index(), + "shape": var.shape, + "slice_shapes": + [",".join([str(i) for i in var.shape])], + "slice_varnames": [var.name], + "remote_varnames": [var.name], + "is_sparse": False, + "endpoints": var_ctx.split_endpoints(), + "file_path": os.path.join(dirname, var.name) + }) + + executor.run(prog) + return local_vars + + def _save_sparse_params(self, executor, dirname, context, main_program): + prog = Program() + block = prog.global_block() + local_vars = [] + + for name, var_ctx in context.items(): + if len(var_ctx.origin_varnames()) != 1: + raise ValueError("Dense can not support split now.") + + varname = var_ctx.origin_varnames()[0] + local_vars.append(varname) + + optimizer = self._get_optimizer_op(varname) + reshaped_varnames, origin_varnames = self._get_optimizer_status( + optimizer.type, varname) + + var = self.origin_main_program.global_block().vars[varname] + slice_shapes = [] + dims1 = ",".join([str(i) for i in var.shape[1:]]) + + for section in var_ctx.sections(): + slice_shapes.append(str(section) + dims1) + + block.append_op( + type='recv_save', + attrs={ + "trainer_id": self.role_maker.worker_index(), + "shape": var.shape, + "slice_shapes": slice_shapes, + "slice_varnames": var_ctx.split_varnames(), + "remote_varnames": var_ctx.split_varnames(), + "is_sparse": True, + "endpoints": var_ctx.split_endpoints(), + "pserver_num": len(self.role_maker.get_pserver_endpoints()), + "file_path": os.path.join(dirname, var.name) + }) + + for reshaped_varname in reshaped_varnames: + var = self.origin_main_program.global_block().vars[ + reshaped_varname] + + slice_varnames = [] + remote_varnames = [] + for i in range(len(var_ctx.split_varnames())): + slice_varnames.append("{}.block{}".format(reshaped_varname, + i)) + remote_varnames.append(reshaped_varname) + + block.append_op( + type='recv_save', + attrs={ + "trainer_id": self.role_maker.worker_index(), + "shape": var.shape, + "slice_shapes": slice_shapes, + "slice_varnames": slice_varnames, + "remote_varnames": remote_varnames, + "is_sparse": True, + "endpoints": var_ctx.split_endpoints(), + "pserver_num": + len(self.role_maker.get_pserver_endpoints()), + "file_path": os.path.join(dirname, var.name) + }) + + for origin_varname in origin_varnames: + var = self.origin_main_program.global_block().vars[ + origin_varname] + + block.append_op( + type='recv_save', + attrs={ + "trainer_id": self.role_maker.worker_index(), + "shape": var.shape, + "slice_shapes": + [",".join([str(i) for i in var.shape])], + "slice_varnames": [origin_varname], + "remote_varnames": [origin_varname], + "is_sparse": False, + "endpoints": var_ctx.split_endpoints()[:1], + "file_path": os.path.join(dirname, var.name) + }) + executor.run(prog) + return context.keys() + + def _save_distributed_params(self, executor, dirname, context, + main_program): + prog = Program() + block = prog.global_block() + + for name, var_ctx in context.items(): + block.append_op( + type='checkpoint_notify', + attrs={ + "varname": name, + "is_slice": True, + "slice_varnames": var_ctx.split_varnames(), + "remote_varnames": var_ctx.split_varnames(), + "endpoints": var_ctx.split_endpoints(), + "dirname": dirname + }) + + executor.run(prog) + return context.keys() + + def _save_distributed_persistables(self, executor, dirname, main_program): + dense_ctx = self.compiled_strategy.get_communicator_recv_context( + recv_type=1) + + sparse_ctx = self.compiled_strategy.get_communicator_recv_context( + recv_type=2) + + distributed_ctx = self.compiled_strategy.get_communicator_recv_context( + recv_type=3) + + recv_dense_varnames = self._save_dense_params(executor, dirname, + dense_ctx, main_program) + + recv_sparse_varnames = self._save_sparse_params( + executor, dirname, sparse_ctx, main_program) + + recv_distributed_varnames = self._save_distributed_params( + executor, dirname, distributed_ctx, main_program) + + saved_varnames = recv_dense_varnames + list( + recv_sparse_varnames) + list(recv_distributed_varnames) + + remaining_vars = list( + filter( + ParameterServerRuntime.__exclude_vars(saved_varnames), + main_program.list_vars())) + + fluid.io.save_vars( + executor, + main_program=main_program, + dirname=dirname, + vars=remaining_vars) + + def _ps_inference_save_persistables(self, + executor, + dirname, + main_program=None, + **kwargs): + """ + This function filters out all variables with `persistable==True` from the + give `main_program` and then saves these variables to the folder `dirname` + or file `filename`. + + The `dirname` is used to specify the folder where persistable variables + are going to be saved. If you would like to save variables in separate + files, set `filename` None; if you would like to save all variables in a + single file, use `filename` to specify the file name. + """ + + if isinstance(executor, ParallelExecutor): + raise TypeError( + "in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed" + ) + + if not isinstance(executor, Executor): + raise TypeError( + "in fleet.save_persistables() function, executor must be as Executor type" + ) + + if main_program is None: + main_program = fluid.default_main_program() + + if isinstance(main_program, CompiledProgram): + raise TypeError( + "in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed" + ) + + self._save_distributed_persistables(executor, dirname, main_program) + + def _ps_inference_save_inference_model(self, + executor, + dirname, + feeded_var_names, + target_vars, + main_program=None, + export_for_deployment=True): + """ + Prune the given `main_program` to build a new program especially for inference, + and then save it and all related parameters to given `dirname` by the `executor`. + """ + + if isinstance(executor, ParallelExecutor): + raise TypeError( + "in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed" + ) + + if not isinstance(executor, Executor): + raise TypeError( + "in fleet.save_inference_model() function, executor must be as Executor type" + ) + + if main_program is not None: + if isinstance(main_program, CompiledProgram): + raise TypeError( + "in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed" + ) + fluid.io.save_inference_model(dirname, feeded_var_names, + target_vars, executor, main_program, + None, None, export_for_deployment) + else: + fluid.io.save_inference_model(dirname, feeded_var_names, + target_vars, executor, + self.origin_main_program, None, None, + export_for_deployment, True) + + model_basename = "__model__" + model_filename = os.path.join(dirname, model_basename) + + with open(model_filename, "rb") as f: + program_desc_str = f.read() + + program = Program.parse_from_string(program_desc_str) + program._copy_dist_param_info_from(fluid.default_main_program()) + self._ps_inference_save_persistables(executor, dirname, program) + + def _save_inference_model(self, *args, **kwargs): + self._ps_inference_save_inference_model(*args, **kwargs) + + def _save_persistables(self, *args, **kwargs): + self._ps_inference_save_persistables(*args, **kwargs) diff --git a/python/paddle/distributed/fleet/runtime/runtime_base.py b/python/paddle/distributed/fleet/runtime/runtime_base.py index 38f9f882cb4..2e8bacfbc3b 100644 --- a/python/paddle/distributed/fleet/runtime/runtime_base.py +++ b/python/paddle/distributed/fleet/runtime/runtime_base.py @@ -33,3 +33,9 @@ class RuntimeBase(object): def _stop_worker(self): pass + + def _save_inference_model(self, *args, **kwargs): + pass + + def _save_persistables(self, *args, **kwargs): + pass diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 126b4465eae..20b49c5fa37 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -33,6 +33,8 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_fleet_checkpoint) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_base) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_2) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_3) list(APPEND MIXED_DIST_TEST_OPS test_fleet_recompute_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_pipeline_meta_optimizer) @@ -382,6 +384,8 @@ if(WITH_DISTRIBUTE) py_test_modules(test_collective_optimizer MODULES test_collective_optimizer) if(NOT APPLE) py_test_modules(test_fleet_base MODULES test_fleet_base ENVS ${dist_ENVS}) + py_test_modules(test_fleet_base_2 MODULES test_fleet_base_2 ENVS ${dist_ENVS}) + py_test_modules(test_fleet_base_3 MODULES test_fleet_base_3 ENVS ${dist_ENVS}) py_test_modules(test_fleet_recompute_meta_optimizer MODULES test_fleet_recompute_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_graph_execution_meta_optimizer MODULES test_fleet_graph_execution_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS}) diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 2f3d3ced6f8..73b546b95cf 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -162,24 +162,17 @@ class TestDistCTR2x2(FleetDistRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) fleet.init_worker() - exe.run(fleet.startup_program) - + exe.run(fluid.default_startup_program()) batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) self.reader.decorate_sample_list_generator(train_reader) - compiled_prog = fluid.compiler.CompiledProgram( - fleet.main_program).with_data_parallel( - loss_name=self.avg_cost.name, - build_strategy=self.strategy.get_build_strategy(), - exec_strategy=self.strategy.get_execute_strategy()) - for epoch_id in range(1): self.reader.start() try: pass_start = time.time() while True: - loss_val = exe.run(program=compiled_prog, + loss_val = exe.run(program=fluid.default_main_program(), fetch_list=[self.avg_cost.name]) loss_val = np.mean(loss_val) # TODO(randomly fail) @@ -209,7 +202,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) fleet.init_worker() - exe.run(fleet.startup_program) + exe.run(fluid.default_startup_program()) thread_num = 2 batch_size = 128 @@ -231,7 +224,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): pass_start = time.time() dataset.set_filelist(filelist) exe.train_from_dataset( - program=fleet.main_program, + program=fluid.default_main_program(), dataset=dataset, fetch_list=[self.avg_cost], fetch_info=["cost"], diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py index c69e1247a9b..77697896b4d 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_sparse_embedding_ctr.py @@ -152,24 +152,18 @@ class TestDistCTR2x2(FleetDistRunnerBase): exe = fluid.Executor(fluid.CPUPlace()) fleet.init_worker() - exe.run(fleet.startup_program) + exe.run(fluid.default_startup_program()) batch_size = 4 train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) self.reader.decorate_sample_list_generator(train_reader) - compiled_prog = fluid.compiler.CompiledProgram( - fleet.main_program).with_data_parallel( - loss_name=self.avg_cost.name, - build_strategy=self.strategy.get_build_strategy(), - exec_strategy=self.strategy.get_execute_strategy()) - for epoch_id in range(1): self.reader.start() try: while True: - loss_val = exe.run(program=compiled_prog, + loss_val = exe.run(program=fluid.default_main_program(), fetch_list=[self.avg_cost.name]) loss_val = np.mean(loss_val) print("TRAIN ---> pass: {} loss: {}\n".format(epoch_id, diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 8c700ff3217..620b5e16a1d 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -31,10 +31,11 @@ import time import tempfile import unittest +import paddle import paddle.fluid as fluid import paddle.distributed.fleet.base.role_maker as role_maker from paddle.distributed.fleet.base.util_factory import fleet_util -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.distributed.fleet import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] @@ -75,21 +76,23 @@ class FleetDistRunnerBase(object): return role def build_strategy(self, args): - self.strategy = None + self.strategy = paddle.distributed.fleet.DistributedStrategy() + self.strategy.a_sync = False if args.mode == "async": - self.strategy = StrategyFactory.create_async_strategy() - elif args.mode == "sync": - self.strategy = StrategyFactory.create_sync_strategy() - elif args.mode == "half_async": - self.strategy = StrategyFactory.create_half_async_strategy() + self.strategy = paddle.distributed.fleet.DistributedStrategy() + self.strategy.a_sync = True elif args.mode == "geo": - self.strategy = StrategyFactory.create_geo_strategy( - args.geo_sgd_need_push_nums) + self.strategy = paddle.distributed.fleet.DistributedStrategy() + self.strategy.a_sync = True + self.strategy.a_sync_configs = { + "k_steps": args.geo_sgd_need_push_nums + } self.dump_param = os.getenv("dump_param", "").split(",") self.dump_fields = os.getenv("dump_fields", "").split(",") self.dump_fields_path = os.getenv("dump_fields_path", "") debug = int(os.getenv("Debug", "0")) - if debug: + # TODO(update strategy to support dump params) + if False: #debug: self.strategy.set_debug_opt({ "dump_param": self.dump_param, "dump_fields": self.dump_fields, @@ -122,7 +125,7 @@ class FleetDistRunnerBase(object): staircase=True)) else: optimizer = fluid.optimizer.SGD(LEARNING_RATE) - optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) def run_pserver(self, args): diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index e16e91192e1..b506f179143 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -22,7 +22,7 @@ from test_dist_fleet_base import TestFleetBase class TestDistMnistSync2x2(TestFleetBase): def _setup_config(self): - self._mode = "async" + self._mode = "sync" self._reader = "pyreader" def check_with_place(self, @@ -123,7 +123,7 @@ class TestDistMnistAsyncDataset2x2(TestFleetBase): class TestDistCtrHalfAsync2x2(TestFleetBase): def _setup_config(self): - self._mode = "half_async" + self._mode = "async" self._reader = "pyreader" def check_with_place(self, diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index ca657a5a619..9e651dea24b 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -17,6 +17,7 @@ import paddle import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker import os +import paddle.fluid as fluid class TestFleetBase(unittest.TestCase): @@ -119,24 +120,9 @@ class TestFleetBase(unittest.TestCase): optimizer = paddle.optimizer.SGD(learning_rate=0.001) optimizer = fleet.distributed_optimizer(optimizer) - def test_minimize(self): - input_x = paddle.fluid.layers.data( - name="x", shape=[32], dtype='float32') - input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') - - fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') - fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') - prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') - cost = paddle.fluid.layers.cross_entropy( - input=prediction, label=input_y) - avg_cost = paddle.fluid.layers.mean(x=cost) - - role = role_maker.PaddleCloudRoleMaker(is_collective=True) - fleet.init(role) - strategy = fleet.DistributedStrategy() - optimizer = paddle.optimizer.SGD(learning_rate=0.001) - optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) - optimizer.minimize(avg_cost) + def test_exception(self): + import paddle.distributed.fleet as fleet + self.assertRaises(Exception, fleet.init_worker) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base_2.py b/python/paddle/fluid/tests/unittests/test_fleet_base_2.py new file mode 100644 index 00000000000..fba7cdbc8ca --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_base_2.py @@ -0,0 +1,102 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import os +import paddle.fluid as fluid + + +class TestFleetBase(unittest.TestCase): + def setUp(self): + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + + def test_ps_minimize(self): + import paddle + import paddle.distributed.fleet as fleet + import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + role = role_maker.PaddleCloudRoleMaker(is_collective=False) + fleet.init(role) + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = False + optimizer = paddle.optimizer.SGD(learning_rate=0.001) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name) + compiled_prog = fluid.compiler.CompiledProgram( + fluid.default_main_program()) + self.assertRaises( + Exception, + fleet.save_inference_model, + dirname='/tmp/', + feeded_var_names=['x', 'y'], + target_vars=[avg_cost], + executor=pe) + + self.assertRaises( + Exception, + fleet.save_inference_model, + dirname='/tmp/', + feeded_var_names=['x', 'y'], + target_vars=[avg_cost], + executor="exe") + + self.assertRaises( + Exception, + fleet.save_inference_model, + dirname='/tmp/', + feeded_var_names=['x', 'y'], + target_vars=[avg_cost], + executor=exe, + main_program=compiled_prog) + + self.assertRaises( + Exception, fleet.save_persistables, executor=pe, dirname='/tmp/') + + self.assertRaises( + Exception, fleet.save_persistables, executor="exe", dirname='/tmp/') + + self.assertRaises( + Exception, + fleet.save_persistables, + executor=exe, + dirname='/tmp/', + main_program=compiled_prog) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base_3.py b/python/paddle/fluid/tests/unittests/test_fleet_base_3.py new file mode 100644 index 00000000000..f5e888ab0eb --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_base_3.py @@ -0,0 +1,52 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +import paddle +import paddle.distributed.fleet as fleet +import paddle.distributed.fleet.base.role_maker as role_maker +import paddle.fluid as fluid + + +class TestFleetBase(unittest.TestCase): + def setUp(self): + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + + def test_collective_minimize(self): + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + role = role_maker.PaddleCloudRoleMaker(is_collective=True) + fleet.init(role) + strategy = fleet.DistributedStrategy() + optimizer = paddle.optimizer.SGD(learning_rate=0.001) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_runtime.py b/python/paddle/fluid/tests/unittests/test_fleet_runtime.py index 3fd646f4340..80109716a54 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_runtime.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_runtime.py @@ -25,6 +25,8 @@ class TestFleetRuntime(unittest.TestCase): base._init_server() base._run_server() base._stop_worker() + base._save_inference_model() + base._save_persistables() def test_fleet_collective_runtime(self): import paddle.distributed.fleet.runtime @@ -35,6 +37,27 @@ class TestFleetRuntime(unittest.TestCase): collective_runtime._init_worker() collective_runtime._run_server() collective_runtime._stop_worker() + collective_runtime._save_inference_model() + collective_runtime._save_persistables() + + def test_fleet_ps_runtime(self): + ps_runtime = paddle.distributed.fleet.runtime.ParameterServerRuntime() + self.assertRaises(Exception, ps_runtime._get_optimizer_status, + "test_op", None) + reshaped_names, origin_names = ps_runtime._get_optimizer_status("adam", + "param") + self.assertTrue( + len(reshaped_names) == 2 and + reshaped_names[0] == 'param_moment1_0' and + reshaped_names[1] == 'param_moment2_0') + self.assertTrue( + len(origin_names) == 2 and + origin_names[0] == 'param_beta1_pow_acc_0' and + origin_names[1] == 'param_beta2_pow_acc_0') + + reshaped_names, origin_names = ps_runtime._get_optimizer_status("sgd", + "param") + self.assertTrue(len(reshaped_names) == 0 and len(origin_names) == 0) if __name__ == "__main__": -- GitLab