From c14ec8782b8c5aa55d78a4a38086f77b3609e5d3 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Mon, 10 Aug 2020 11:16:28 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90paddle.fleet=E3=80=91Feature/fleet=20p?= =?UTF-8?q?s=20api=202.0=20(#25857)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add paddle.fleet.AsyncOptimizer Co-authored-by: dongdaxiang --- .../framework/distributed_strategy.proto | 2 +- .../paddle/fleet/base/distributed_strategy.py | 5 +- python/paddle/fleet/base/fleet_base.py | 13 +- .../fleet/base/meta_optimizer_factory.py | 23 +- python/paddle/fleet/base/runtime_factory.py | 7 + .../paddle/fleet/meta_optimizers/__init__.py | 5 + .../async_graph_execution_optimizer.py | 64 +++++ .../fleet/meta_optimizers/async_optimizer.py | 142 ++++++++++ python/paddle/fleet/runtime/__init__.py | 3 +- .../fleet/runtime/collective_runtime.py | 2 +- .../fleet/runtime/parameter_server_runtime.py | 243 ++++++++++++++++++ python/paddle/fleet/runtime/runtime_base.py | 2 +- .../tests/unittests/test_communicator_geo.py | 57 ++-- .../tests/unittests/test_communicator_sync.py | 28 +- .../test_dist_fleet_a_sync_optimizer_async.py | 113 ++++++++ .../test_dist_fleet_a_sync_optimizer_geo.py | 110 ++++++++ .../test_dist_fleet_a_sync_optimizer_sync.py | 73 ++++++ .../tests/unittests/test_dist_fleet_ps2.py | 45 ++-- .../test_fleet_distributed_strategy.py | 5 +- 19 files changed, 860 insertions(+), 82 deletions(-) mode change 100755 => 100644 python/paddle/fleet/meta_optimizers/__init__.py create mode 100644 python/paddle/fleet/meta_optimizers/async_graph_execution_optimizer.py create mode 100644 python/paddle/fleet/meta_optimizers/async_optimizer.py create mode 100644 python/paddle/fleet/runtime/parameter_server_runtime.py create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto index 05b7a16f15..0c0f9d8228 100755 --- a/paddle/fluid/framework/distributed_strategy.proto +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -80,7 +80,7 @@ message ExecutionStrategy { } message AsyncConfig { - optional int32 k_steps = 1 [ default = 1 ]; + optional int32 k_steps = 1 [ default = -1 ]; optional int32 max_merge_var_num = 2 [ default = 1 ]; optional int32 send_queue_size = 3 [ default = 16 ]; optional bool independent_recv_thread = 4 [ default = false ]; diff --git a/python/paddle/fleet/base/distributed_strategy.py b/python/paddle/fleet/base/distributed_strategy.py index 43e50ca0be..6ada175b89 100644 --- a/python/paddle/fleet/base/distributed_strategy.py +++ b/python/paddle/fleet/base/distributed_strategy.py @@ -227,8 +227,11 @@ class DistributedStrategy(object): def a_sync(self, flag): if isinstance(flag, bool): self.strategy.a_sync = flag + self.a_sync_configs = {"k_steps": 0} else: - print("WARNING: a_sync should have value of bool type") + raise ValueError( + "The type of `flag` is invalid, expected type is bool, but received %s". + format(type(flag))) @property def a_sync_configs(self): diff --git a/python/paddle/fleet/base/fleet_base.py b/python/paddle/fleet/base/fleet_base.py index 979b878a3d..459afedf3a 100644 --- a/python/paddle/fleet/base/fleet_base.py +++ b/python/paddle/fleet/base/fleet_base.py @@ -189,12 +189,12 @@ class Fleet(object): assert self._runtime_handle is not None self._runtime_handle._init_worker() - def init_server(self, model_dir=None): + def init_server(self, *args, **kwargs): """ init server """ assert self._runtime_handle is not None - self._runtime_handle._init_server() + self._runtime_handle._init_server(*args, **kwargs) def run_server(self): """ @@ -291,6 +291,7 @@ class Fleet(object): else: self.origin_startup_program = \ startup_program.clone(for_test=False) + context["origin_startup_program"] = startup_program context["role_maker"] = self._role_maker @@ -329,12 +330,19 @@ class Fleet(object): optimize_ops = [] params_grads = [] + if meta_optimizer: optimize_ops, params_grads = meta_optimizer.minimize( loss, startup_program=startup_program, parameter_list=parameter_list, no_grad_set=no_grad_set) + + default_program = paddle.default_main_program() + + if id(default_program) != id(loss.block.program): + paddle.fluid.framework.switch_main_program(loss.block.program) + else: optimize_ops, params_grads = self.user_defined_optimizer.minimize( loss, @@ -344,6 +352,7 @@ class Fleet(object): context["program_optimize_ops"] = optimize_ops context["program_params_grads"] = params_grads + if graph_optimizer: optimize_ops, params_grads = graph_optimizer.minimize( loss, diff --git a/python/paddle/fleet/base/meta_optimizer_factory.py b/python/paddle/fleet/base/meta_optimizer_factory.py index 802f6c4dab..459070fcc4 100755 --- a/python/paddle/fleet/base/meta_optimizer_factory.py +++ b/python/paddle/fleet/base/meta_optimizer_factory.py @@ -12,27 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ..meta_optimizers import AMPOptimizer -from ..meta_optimizers import RecomputeOptimizer -from ..meta_optimizers import GradientMergeOptimizer -from ..meta_optimizers import GraphExecutionOptimizer -from ..meta_optimizers import PipelineOptimizer -from ..meta_optimizers import LocalSGDOptimizer -from ..meta_optimizers import LarsOptimizer -from ..meta_optimizers import DGCOptimizer - __all__ = ["MetaOptimizerFactory"] -meta_optimizer_names = [ - "AMPOptimizer", - "RecomputeOptimizer", - "GradientMergeOptimizer", - "GraphExecutionOptimizer", - "PipelineOptimizer", - "LocalSGDOptimizer", - "LarsOptimizer", - "DGCOptimizer", -] +from ..meta_optimizers import * + +meta_optimizer_names = list( + filter(lambda name: name.endswith("Optimizer"), dir())) class MetaOptimizerFactory(object): diff --git a/python/paddle/fleet/base/runtime_factory.py b/python/paddle/fleet/base/runtime_factory.py index 45dca6dae4..68d327c228 100644 --- a/python/paddle/fleet/base/runtime_factory.py +++ b/python/paddle/fleet/base/runtime_factory.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from ..runtime.collective_runtime import CollectiveRuntime +from ..runtime.parameter_server_runtime import ParameterServerRuntime class RuntimeFactory(object): @@ -23,3 +24,9 @@ class RuntimeFactory(object): collective_runtime = CollectiveRuntime() collective_runtime._set_basic_info(context) return collective_runtime + + k_steps = context["valid_strategy"].a_sync_configs["k_steps"] + if not context["role_maker"]._is_collective and k_steps >= 0: + ps_runtime = ParameterServerRuntime() + ps_runtime._set_basic_info(context) + return ps_runtime diff --git a/python/paddle/fleet/meta_optimizers/__init__.py b/python/paddle/fleet/meta_optimizers/__init__.py old mode 100755 new mode 100644 index 718805c5aa..81ea958f32 --- a/python/paddle/fleet/meta_optimizers/__init__.py +++ b/python/paddle/fleet/meta_optimizers/__init__.py @@ -15,17 +15,22 @@ from .amp_optimizer import AMPOptimizer from .recompute_optimizer import RecomputeOptimizer from .gradient_merge_optimizer import GradientMergeOptimizer from .graph_execution_optimizer import GraphExecutionOptimizer +from .async_optimizer import AsyncMetaOptimizer from .pipeline_optimizer import PipelineOptimizer from .localsgd_optimizer import LocalSGDOptimizer from .lars_optimizer import LarsOptimizer +from .async_graph_execution_optimizer import AsyncGraphExecutionOptimizer from .dgc_optimizer import DGCOptimizer __all__ = [ 'AMPOptimizer', 'RecomputeOptimizer', 'GradientMergeOptimizer', + 'AsyncMetaOptimizer', + 'GraphExecutionOptimizer', 'PipelineOptimizer', 'LocalSGDOptimizer', 'LarsOptimizer', + 'AsyncGraphExecutionOptimizer', 'DGCOptimizer', ] diff --git a/python/paddle/fleet/meta_optimizers/async_graph_execution_optimizer.py b/python/paddle/fleet/meta_optimizers/async_graph_execution_optimizer.py new file mode 100644 index 0000000000..890eae2c14 --- /dev/null +++ b/python/paddle/fleet/meta_optimizers/async_graph_execution_optimizer.py @@ -0,0 +1,64 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from paddle import fluid +from paddle.fluid import compiler +from .async_optimizer import AsyncMetaOptimizer + + +class AsyncGraphExecutionOptimizer(AsyncMetaOptimizer): + def __init__(self, optimizer): + super(AsyncGraphExecutionOptimizer, self).__init__(optimizer) + self.inner_opt = optimizer + # we do not allow meta optimizer to be inner optimizer currently + self.meta_optimizers_white_list = [] + + def _can_apply(self): + k_steps = self.user_defined_strategy.a_sync_configs["k_steps"] + if k_steps < 0: + return False + + if self.role_maker.is_server(): + return False + + return True + + def _is_graph_out(self): + return True + + def _try_to_compile(self, main_program, loss): + dist_strategy = self._get_distributed_strategy() + + build_strategy = dist_strategy.get_build_strategy() + exec_strategy = dist_strategy.get_execute_strategy() + + self._compiled_program = compiler.CompiledProgram(main_program) + + self._compiled_program.with_data_parallel( + loss_name=loss.name, + build_strategy=build_strategy, + exec_strategy=exec_strategy, + share_vars_from=None) + + return self._compiled_program + + def minimize(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + program = loss.block.program + compiled_program = self._try_to_compile(program, loss) + program._graph = compiled_program + # just return self.optimizer_ops and self.param_grads + return None, None diff --git a/python/paddle/fleet/meta_optimizers/async_optimizer.py b/python/paddle/fleet/meta_optimizers/async_optimizer.py new file mode 100644 index 0000000000..b88e863d7b --- /dev/null +++ b/python/paddle/fleet/meta_optimizers/async_optimizer.py @@ -0,0 +1,142 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and + +from paddle import fluid +from .meta_optimizer_base import MetaOptimizerBase + + +class AsyncMetaOptimizer(MetaOptimizerBase): + def __init__(self, optimizer): + super(AsyncMetaOptimizer, self).__init__(optimizer) + self.inner_opt = optimizer + # we do not allow meta optimizer to be inner optimizer currently + self.meta_optimizers_white_list = [] + + def _is_graph_out(self): + return False + + def _can_apply(self): + if self.role_maker._is_collective: + return False + k_steps = self.user_defined_strategy.a_sync_configs["k_steps"] + return True if k_steps >= 0 else False + + def _get_distributed_strategy(self): + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory + + k_steps = self.user_defined_strategy.a_sync_configs["k_steps"] + strategy = None + + if not self.user_defined_strategy.a_sync and k_steps == 0: + strategy = StrategyFactory.create_sync_strategy() + + if self.user_defined_strategy.a_sync and k_steps == 0: + strategy = StrategyFactory.create_async_strategy() + + if self.user_defined_strategy.a_sync and k_steps > 0: + strategy = StrategyFactory.create_geo_strategy(k_steps) + + if not strategy: + raise ValueError("k_steps must be invalid value, please check") + + return strategy + + def _build_trainer_programs(self, compiled_config): + from paddle.fluid.incubate.fleet.parameter_server.ir import trainer_pass as worker + + _main = compiled_config.origin_main_program.clone() + _startup = compiled_config.origin_startup_program.clone() + + if not compiled_config.is_geo_mode(): + # for main program + _main = worker.delete_optimizer_pass(_main, compiled_config) + _main = worker.distributed_ops_pass(_main, compiled_config) + _main = worker.append_send_ops_pass(_main, compiled_config) + + # for startup program + _startup = worker.fake_init_ops_pass(_startup, compiled_config) + _startup = worker.init_from_server_pass(_startup, compiled_config) + _startup = worker.delet_extra_optimizes_pass(_startup, + compiled_config) + else: + _main = worker.append_send_ops_pass(_main, compiled_config) + _startup = _startup + + return _main, _startup + + def _build_pserver_programs(self, compiled_config): + from paddle.fluid.incubate.fleet.parameter_server.ir import pserver_pass as server + + _main = fluid.Program() + _startup = fluid.Program() + + if not compiled_config.is_geo_mode(): + _main = server.add_listen_and_serv_pass(_main, compiled_config) + _main = server.add_rpc_global_flags_pass(_main, compiled_config) + _main = server.add_optimizer_pass(_main, compiled_config) + _main = server.large_scale_sparse_pass(_main, _main, + compiled_config, False) + _startup = server.build_pserver_startup_program_pass( + _startup, _main, compiled_config) + _startup = server.large_scale_sparse_pass(_startup, _main, + compiled_config, True) + + if not compiled_config.is_sync_mode(): + _main = server.delete_unused_in_main_pass(_main, + compiled_config) + + _startup = server.delete_unused_in_startup_pass(_startup, _main, + compiled_config) + else: + _main = server.add_listen_and_serv_pass(_main, compiled_config) + _main = server.add_rpc_global_flags_pass(_main, compiled_config) + _main = server.add_geo_optimizer_pass(_main, compiled_config) + _main = server.large_scale_sparse_pass(_main, _main, + compiled_config, False) + _startup = server.build_pserver_startup_program_pass( + _startup, _main, compiled_config) + _startup = server.large_scale_sparse_pass(_startup, _main, + compiled_config, True) + _startup = server.delete_unused_in_startup_pass(_startup, _main, + compiled_config) + + return _main, _startup + + def minimize_impl(self, + loss, + startup_program=None, + parameter_list=None, + no_grad_set=None): + self.inner_opt.minimize(loss, startup_program, parameter_list, + no_grad_set) + strategy = self._get_distributed_strategy() + + _origin_main_program = loss.block.program + _origin_startup_program = startup_program + from paddle.fluid.incubate.fleet.parameter_server.ir import public as public + + compiled_config = public.CompileTimeStrategy(_origin_main_program, + _origin_startup_program, + strategy, self.role_maker) + + main_program, startup_program = \ + self._build_trainer_programs(compiled_config) if self.role_maker.is_worker() \ + else self._build_pserver_programs(compiled_config) + + loss.block.program = main_program + fluid.framework.switch_startup_program(startup_program) + + return None, None + + def _disable_strategy(self, dist_strategy): + self.user_defined_strategy.a_sync_configs["k_steps"] = -1 diff --git a/python/paddle/fleet/runtime/__init__.py b/python/paddle/fleet/runtime/__init__.py index f38287cf51..a796a73fc9 100644 --- a/python/paddle/fleet/runtime/__init__.py +++ b/python/paddle/fleet/runtime/__init__.py @@ -13,5 +13,6 @@ # limitations under the License. from .collective_runtime import CollectiveRuntime +from .parameter_server_runtime import ParameterServerRuntime -__all__ = ["CollectiveRuntime"] +__all__ = ["CollectiveRuntime," "ParameterServerRuntime", ] diff --git a/python/paddle/fleet/runtime/collective_runtime.py b/python/paddle/fleet/runtime/collective_runtime.py index 0881c4b52c..c56cf4c7aa 100644 --- a/python/paddle/fleet/runtime/collective_runtime.py +++ b/python/paddle/fleet/runtime/collective_runtime.py @@ -30,7 +30,7 @@ class CollectiveRuntime(RuntimeBase): "You should not call 'run_worker' method for collective mode.") pass - def _init_server(self): + def _init_server(self, *args, **kwargs): logging.warn( "You should not call 'init_server' method for collective mode.") pass diff --git a/python/paddle/fleet/runtime/parameter_server_runtime.py b/python/paddle/fleet/runtime/parameter_server_runtime.py new file mode 100644 index 0000000000..813649edbc --- /dev/null +++ b/python/paddle/fleet/runtime/parameter_server_runtime.py @@ -0,0 +1,243 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import warnings + +import paddle.fluid as fluid +from paddle.fluid import core + +from .runtime_base import RuntimeBase + + +class ParameterServerRuntime(RuntimeBase): + def __init__(self): + super(ParameterServerRuntime, self).__init__() + self._communicator = None + + def _set_basic_info(self, context): + self.context = context + self.role_maker = context["role_maker"] + self.origin_main_program = context["origin_main_program"] + self.origin_startup_program = context["origin_startup_program"] + self.async_strategy = self._get_distributed_strategy() + self.compiled_strategy = self.build_compiled_startegy() + + def _get_distributed_strategy(self): + strategy = None + + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory + + dist_strategy = self.context["valid_strategy"] + k_steps = dist_strategy.a_sync_configs["k_steps"] + + if not dist_strategy.a_sync and k_steps == 0: + strategy = StrategyFactory.create_sync_strategy() + + if dist_strategy.a_sync and k_steps == 0: + strategy = StrategyFactory.create_async_strategy() + + if dist_strategy.a_sync and k_steps > 0: + strategy = StrategyFactory.create_geo_strategy(k_steps) + + if not strategy: + raise ValueError("k_steps must be invalid value, please check") + + return strategy + + def build_compiled_startegy(self): + from paddle.fluid.incubate.fleet.parameter_server.ir.public import CompileTimeStrategy + + compiled_config = CompileTimeStrategy( + self.origin_main_program, self.origin_main_program, + self.async_strategy, self.role_maker) + return compiled_config + + def _load_sparse_params(self, dirname, varnames): + from paddle.fluid.communicator import LargeScaleKV + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_varname_parts + + scale_kv = LargeScaleKV() + for varname in varnames: + origin_varname, _, _ = _get_varname_parts(varname) + sparse_dir = os.path.join(dirname, origin_varname, varname) + scale_kv.load(varname, sparse_dir) + + @staticmethod + def __exclude_vars(exclude_var_names=[]): + def is_valid(var): + if var.name in exclude_var_names: + return False + + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_varname_parts + + origin_varname, _, _ = _get_varname_parts(var.name) + if origin_varname.endswith("@GRAD"): + return False + + if origin_varname == "learning_rate_0": + return False + + if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.READER: + return False + return var.persistable + + return is_valid + + def _init_worker(self): + def sync_strategy_envs(): + kwargs = {} + kwargs["pserver_endpoints"] = self.role_maker.get_pserver_endpoints( + ) + kwargs["trainer_id"] = self.role_maker.worker_index() + return kwargs + + def geo_strategy_envs(): + from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_tablenames + + def get_sparse_attrs(): + opt_init_map = {} + opt_init_map["gaussian_random"] = ["seed", "mean", "std"] + opt_init_map["fill_constant"] = ["value"] + opt_init_map["uniform_random"] = ["seed", "min", "max"] + opt_init_map[ + "truncated_gaussian_random"] = ["seed", "mean", "std"] + + dist_varnames = get_sparse_tablenames(self.origin_main_program, + True) + sparse_varnames = get_sparse_tablenames( + self.origin_main_program, False) + + if len(dist_varnames) != 0: + raise ValueError( + "GeoStrategy can not support large scale embeding now, please use fluid.layers.embedding" + ) + + init_attrs = [] + for value_name in sparse_varnames: + value_var = self.origin_main_program.global_block().vars[ + value_name] + value_attr = [ + value_name, + ",".join([str(dim) for dim in value_var.shape]) + ] + for op in self.origin_startup_program.global_block().ops: + if op.type in opt_init_map.keys( + ) and value_name == op.output("Out")[0]: + init_attr = [op.type] + for attr in opt_init_map[op.type]: + init_attr.append(str(op.attr(attr))) + value_attr.append("&".join(init_attr)) + init_attrs.append(":".join(value_attr)) + break + return "#".join(init_attrs) + + kwargs = {} + kwargs["trainers"] = self.role_maker.worker_num() + kwargs["sparse_attrs"] = get_sparse_attrs() + return kwargs + + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops + + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import \ + SyncStrategy, GeoStrategy + + trainer_config = self.async_strategy.get_trainer_runtime_config() + lrs = _get_lr_ops(self.origin_main_program) + + if len(lrs) > 0: + kwargs = {"need_global_step": "1"} + else: + kwargs = {"need_global_step": "0"} + + if isinstance(self.async_strategy, GeoStrategy): + geo_kwargs = geo_strategy_envs() + kwargs.update(geo_kwargs) + if isinstance(self.async_strategy, SyncStrategy): + sync_kwargs = sync_strategy_envs() + kwargs.update(sync_kwargs) + + kwargs = kwargs if kwargs else None + + send_ctx = self.compiled_strategy.get_communicator_send_context() + + if self.compiled_strategy.is_geo_mode(): + recv_ctx = self.compiled_strategy.get_communicator_recv_context( + recv_type=4) + else: + recv_ctx = self.compiled_strategy.get_communicator_recv_context( + recv_type=1) + + from paddle.fluid.communicator import Communicator + self._communicator = Communicator( + trainer_config.mode, kwargs, + trainer_config.get_communicator_flags()) + self._communicator.init_with_ctx(send_ctx, recv_ctx) + + if not self._communicator.is_running(): + self._communicator.start() + else: + warnings.warn("communicator has been initialized, skip") + + def _init_server(self, *args, **kwargs): + if len(args) > 1: + raise ValueError("init server can only accept 1 args: `dirname`") + elif len(args) == 1: + model_dirname = args[0] + else: + model_dirname = None + + executor = fluid.Executor(fluid.CPUPlace()) + executor.run(fluid.default_startup_program()) + + if not model_dirname: + return + + if not os.path.isdir(model_dirname): + raise ValueError("There is no directory named '%s'", model_dirname) + + sparse_varnames = self.compiled_strategy.get_sparse_varname_on_ps(True) + + distribtued_varnames = self.compiled_strategy.get_sparse_varname_on_ps( + False) + + remaining_vars = list( + filter( + ParameterServerRuntime.__exclude_vars(sparse_varnames + + distribtued_varnames), + fluid.default_main_program().list_vars())) + + fluid.io.load_vars( + executor, + main_program=fluid.default_main_program(), + dirname=model_dirname, + vars=remaining_vars) + + self._load_sparse_params( + dirname=model_dirname, varnames=sparse_varnames) + + # todo(tangwei12) load distributed vars + # self._load_sparse_params(dirname=model_dir, varnames=distribtued_varnames) + + def _run_server(self): + executor = fluid.Executor(fluid.CPUPlace()) + executor.run(fluid.default_main_program()) + + def _stop_worker(self): + self._communicator.stop() + executor = fluid.Executor(fluid.CPUPlace()) + executor.close() diff --git a/python/paddle/fleet/runtime/runtime_base.py b/python/paddle/fleet/runtime/runtime_base.py index c7ce8b5a29..38f9f882cb 100644 --- a/python/paddle/fleet/runtime/runtime_base.py +++ b/python/paddle/fleet/runtime/runtime_base.py @@ -25,7 +25,7 @@ class RuntimeBase(object): def _run_worker(self): pass - def _init_server(self): + def _init_server(self, *args, **kwargs): pass def _run_server(self): diff --git a/python/paddle/fluid/tests/unittests/test_communicator_geo.py b/python/paddle/fluid/tests/unittests/test_communicator_geo.py index 46cae41f30..bff8908682 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_geo.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_geo.py @@ -26,32 +26,44 @@ import paddle import paddle.fluid as fluid import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +import paddle.fleet as fleet class TestCommunicatorGeoEnd2End(unittest.TestCase): def net(self): x = fluid.layers.data(name='x', shape=[13], dtype='float32') - y_predict = fluid.layers.fc(input=x, size=1, act=None) + x1 = fluid.layers.data(name='x1', shape=[1], dtype='int64', lod_level=1) + + emb = fluid.layers.embedding( + input=x1, + size=[10000, 10], + param_attr=fluid.ParamAttr( + name="embedding", + initializer=fluid.initializer.Constant(value=0.01)), + is_sparse=True) + + pool = fluid.layers.sequence_pool(input=emb, pool_type="sum") + z = fluid.layers.concat(input=[x, pool], axis=1) + y_predict = fluid.layers.fc(input=z, size=1, act=None) y = fluid.layers.data(name='y', shape=[1], dtype='float32') cost = fluid.layers.square_error_cost(input=y_predict, label=y) avg_cost = fluid.layers.mean(cost) - return avg_cost, x, y + return avg_cost, x, x1, y def fake_reader(self): def reader(): for i in range(10000): x = numpy.random.random((1, 13)).astype('float32') + z = numpy.random.randint(0, 9999, (1, 1)).astype('int64') y = numpy.random.randint(0, 2, (1, 1)).astype('int64') - yield x, y + yield x, z, y return reader def run_pserver(self, role, strategy): fleet.init(role) - avg_cost, x, y = self.net() + avg_cost, x, z, y = self.net() optimizer = fluid.optimizer.SGD(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) @@ -64,33 +76,41 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): exe = fluid.Executor(place) fleet.init(role) - avg_cost, x, y = self.net() + avg_cost, x, z, y = self.net() optimizer = fluid.optimizer.SGD(0.01) optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) fleet.init_worker() - exe.run(fleet.startup_program) + exe.run(fluid.default_startup_program()) train_reader = paddle.batch(self.fake_reader(), batch_size=24) - feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) + feeder = fluid.DataFeeder(place=place, feed_list=[x, z, y]) for batch_id, data in enumerate(train_reader()): - exe.run(fleet.main_program, feed=feeder.feed(data), fetch_list=[]) + exe.run(fluid.default_main_program(), + feed=feeder.feed(data), + fetch_list=[]) fleet.stop_worker() def run_ut(self): training_role = os.getenv("TRAINING_ROLE", "TRAINER") - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.WORKER - if training_role == "TRAINER" else role_maker.Role.SERVER, - worker_num=1, - server_endpoints=["127.0.0.1:18099"]) + os.environ["PADDLE_PSERVER_NUMS"] = "1" + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001" - strategy = StrategyFactory.create_geo_strategy(10) + role = role_maker.PaddleCloudRoleMaker() + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = True + strategy.a_sync_configs = {"k_steps": 100} if training_role == "TRAINER": self.run_trainer(role, strategy) @@ -116,8 +136,7 @@ import paddle.fluid as fluid from paddle.fluid.communicator import Communicator import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +import paddle.fleet as fleet from test_communicator_geo import TestCommunicatorGeoEnd2End diff --git a/python/paddle/fluid/tests/unittests/test_communicator_sync.py b/python/paddle/fluid/tests/unittests/test_communicator_sync.py index be1f32fb0a..7ac5c14177 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_sync.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_sync.py @@ -16,16 +16,13 @@ from __future__ import print_function import unittest import time -import threading -import numpy +import os import paddle import paddle.fluid as fluid -from paddle.fluid.communicator import Communicator import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +import paddle.fleet as fleet class TestCommunicator(unittest.TestCase): @@ -39,19 +36,24 @@ class TestCommunicator(unittest.TestCase): return avg_cost def test_communicator_sync(self): - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.WORKER, - worker_num=2, - server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"]) + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" - fleet.init(role) + fleet.init(role_maker.PaddleCloudRoleMaker()) avg_cost = self.net() optimizer = fluid.optimizer.SGD(0.01) - strategy = StrategyFactory.create_sync_strategy() - strategy._program_config.wait_port = False + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = False + optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py new file mode 100644 index 0000000000..a7ef230f79 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_async.py @@ -0,0 +1,113 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import unittest + +import paddle +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + + def test_a_sync_optimizer_trainer(self): + os.environ["TRAINING_ROLE"] = "TRAINER" + import paddle.fleet as fleet + + main_program = paddle.fluid.Program() + startup_program = paddle.fluid.Program() + + paddle.fluid.framework.switch_main_program(main_program) + paddle.fluid.framework.switch_startup_program(startup_program) + + fleet.init(role_maker.PaddleCloudRoleMaker()) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + prog = paddle.fluid.default_main_program() + self.assertNotEqual(prog.global_block().ops[-1].type, "send_barrier") + + sends = 0 + sgds = 0 + for op in prog.global_block().ops: + if op.type == "send": + sends += 1 + if op.type == "sgd": + sgds += 1 + self.assertEqual(sends, 7) + self.assertEqual(sgds, 0) + + fleet.init_worker() + time.sleep(8) + fleet.stop_worker() + + def test_a_sync_optimizer_pserver(self): + os.environ["TRAINING_ROLE"] = "PSERVER" + import paddle.fleet as fleet + + main_program = paddle.fluid.Program() + startup_program = paddle.fluid.Program() + + paddle.fluid.framework.switch_main_program(main_program) + paddle.fluid.framework.switch_startup_program(startup_program) + + fleet.init(role_maker.PaddleCloudRoleMaker()) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + prog = paddle.fluid.default_main_program() + self.assertEqual(prog.global_block().ops[0].type, "listen_and_serv") + fleet.init_server() + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py new file mode 100644 index 0000000000..87ffeb31fd --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_geo.py @@ -0,0 +1,110 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import os +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import time + + +class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + + def test_a_sync_optimizer_trainer(self): + os.environ["TRAINING_ROLE"] = "TRAINER" + import paddle.fleet as fleet + + main_program = paddle.fluid.Program() + startup_program = paddle.fluid.Program() + + paddle.fluid.framework.switch_main_program(main_program) + paddle.fluid.framework.switch_startup_program(startup_program) + + fleet.init(role_maker.PaddleCloudRoleMaker()) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = True + strategy.a_sync_configs = {"k_steps": 100} + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + prog = paddle.fluid.default_main_program() + self.assertEqual(prog.global_block().ops[-1].type, "send") + + sends = 0 + sgds = 0 + + for op in prog.global_block().ops: + if op.type == "send": + sends += 1 + if op.type == "sgd": + sgds += 1 + self.assertEqual(sends, 1) + self.assertEqual(sgds, 6) + + def test_a_sync_optimizer_pserver(self): + os.environ["TRAINING_ROLE"] = "PSERVER" + import paddle.fleet as fleet + + main_program = paddle.fluid.Program() + startup_program = paddle.fluid.Program() + + paddle.fluid.framework.switch_main_program(main_program) + paddle.fluid.framework.switch_startup_program(startup_program) + + fleet.init(role_maker.PaddleCloudRoleMaker()) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = True + strategy.a_sync_configs = {"k_steps": 100} + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + prog = paddle.fluid.default_main_program() + self.assertEqual(prog.global_block().ops[0].type, "listen_and_serv") + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py new file mode 100644 index 0000000000..6214588535 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_a_sync_optimizer_sync.py @@ -0,0 +1,73 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import os +import paddle.fleet as fleet +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +import time + + +class TestFleetGradientMergeMetaOptimizer(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "6007" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + + def test_gradient_merge_optimizer(self): + fleet.init(role_maker.PaddleCloudRoleMaker()) + input_x = paddle.fluid.layers.data( + name="x", shape=[32], dtype='float32') + input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64') + + fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh') + fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh') + prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax') + cost = paddle.fluid.layers.cross_entropy( + input=prediction, label=input_y) + avg_cost = paddle.fluid.layers.mean(x=cost) + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = False + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + optimizer.minimize(avg_cost) + + prog = paddle.fluid.default_main_program() + self.assertEqual(prog.global_block().ops[-1].type, "send_barrier") + + sends = 0 + sgds = 0 + for op in prog.global_block().ops: + if op.type == "send": + sends += 1 + if op.type == "sgd": + sgds += 1 + self.assertEqual(sends, 6) + self.assertEqual(sgds, 0) + + fleet.init_worker() + time.sleep(8) + fleet.stop_worker() + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps2.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps2.py index 833b7307fa..f616800826 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ps2.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ps2.py @@ -19,10 +19,10 @@ import unittest import tempfile import shutil +import paddle import paddle.fluid as fluid import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +import paddle.fleet as fleet # For Net base_lr = 0.2 @@ -149,40 +149,41 @@ class TestPSPassWithBow(unittest.TestCase): return [avg_cost, acc, cos_q_pt] def test(self): - endpoints = ["127.0.0.1:36004"] - - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.SERVER, - worker_num=2, - server_endpoints=endpoints) - + os.environ["PADDLE_PSERVER_NUMS"] = "2" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINER_ID"] = "0" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ + "127.0.0.1:36001,127.0.0.2:36001" + os.environ["TRAINING_ROLE"] = "PSERVER" + + role = role_maker.PaddleCloudRoleMaker() fleet.init(role) loss, acc, _ = self.net() - optimizer = fluid.optimizer.SGD(base_lr) - strategy = StrategyFactory.create_async_strategy() - optimizer = fleet.distributed_optimizer(optimizer, strategy) + + strategy = paddle.fleet.DistributedStrategy() + strategy.a_sync = True + optimizer = paddle.optimizer.SGD(learning_rate=0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(loss) - fleet.startup_program_bak = fleet.startup_program - fleet.startup_program = None + model_dir = tempfile.mkdtemp() with self.assertRaises(ValueError): - fleet.init_server() - - model_dir = tempfile.mkdtemp() + fleet.init_server(os.path.join(model_dir, "temp"), "xxxx") with self.assertRaises(ValueError): fleet.init_server(os.path.join(model_dir, "temp")) - fleet.startup_program = fleet.startup_program_bak fleet.init_server() from paddle.fluid.communicator import LargeScaleKV kv = LargeScaleKV() - kv.save("__emb__", os.path.join(model_dir, "__emb__", "__emb__")) - - fleet.main_program = fluid.Program() + kv.save("__emb__.block0", + os.path.join(model_dir, "__emb__", "__emb__.block0")) + fluid.framework.switch_main_program(fluid.Program()) fleet.init_server(model_dir) shutil.rmtree(model_dir) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py index 4994e4514d..aeac7f51a2 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -197,8 +197,9 @@ class TestStrategyConfig(unittest.TestCase): self.assertEqual(strategy.a_sync, True) strategy.a_sync = False self.assertEqual(strategy.a_sync, False) - strategy.a_sync = "True" - self.assertEqual(strategy.a_sync, False) + + with self.assertRaises(ValueError): + strategy.a_sync = "True" def test_a_sync_configs(self): strategy = paddle.fleet.DistributedStrategy() -- GitLab