From c4a6a0e2e4ff44041424aaa46d0e0bd66721bb78 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 18 Mar 2020 11:08:02 +0800 Subject: [PATCH] Revert "Integrated API of Parameter Server (#22710)" test=develop (#23071) This reverts commit 66fce9e82476c3cc608a8520a602a2223a0cef8e. --- python/paddle/fluid/incubate/__init__.py | 3 +- .../paddle/fluid/incubate/fleet/__init__.py | 5 +- .../fluid/incubate/fleet/base/__init__.py | 6 -- .../fluid/incubate/fleet/base/fleet_base.py | 4 +- .../fluid/incubate/fleet/base/role_maker.py | 10 +-- .../fleet/parameter_server/__init__.py | 17 ----- .../distribute_transpiler/__init__.py | 2 +- .../distributed_strategy.py | 15 ----- .../fleet/parameter_server/pslib/__init__.py | 44 +++++-------- .../fleet/parameter_server/pslib/node.py | 1 - .../fluid/tests/unittests/CMakeLists.txt | 2 - .../tests/unittests/test_communicator_sync.py | 2 +- .../fluid/tests/unittests/test_dataset.py | 29 ++++++++- .../tests/unittests/test_dist_fleet_base.py | 2 +- .../unittests/test_distributed_strategy.py | 2 +- .../unittests/test_fleet_input_with_psilib.py | 65 ------------------- python/setup.py.in | 30 --------- 17 files changed, 56 insertions(+), 183 deletions(-) rename python/paddle/fluid/incubate/fleet/parameter_server/{ => distribute_transpiler}/distributed_strategy.py (97%) delete mode 100644 python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py diff --git a/python/paddle/fluid/incubate/__init__.py b/python/paddle/fluid/incubate/__init__.py index 4f5a31b5fba..76c5c6391fd 100644 --- a/python/paddle/fluid/incubate/__init__.py +++ b/python/paddle/fluid/incubate/__init__.py @@ -14,5 +14,4 @@ # incubate directory is mainly for internal use # after we have tested incubate APIs in industrial application for a period # we will move stable functions into fluid - -from __future__ import print_function +__version__ = '0.1.0' diff --git a/python/paddle/fluid/incubate/fleet/__init__.py b/python/paddle/fluid/incubate/fleet/__init__.py index 2b4bd2dbd9d..a05baabca39 100644 --- a/python/paddle/fluid/incubate/fleet/__init__.py +++ b/python/paddle/fluid/incubate/fleet/__init__.py @@ -11,7 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -from __future__ import print_function - -from . import base -from .base import * +__version__ = '0.1.0' diff --git a/python/paddle/fluid/incubate/fleet/base/__init__.py b/python/paddle/fluid/incubate/fleet/base/__init__.py index 13981773185..8647330f329 100644 --- a/python/paddle/fluid/incubate/fleet/base/__init__.py +++ b/python/paddle/fluid/incubate/fleet/base/__init__.py @@ -10,9 +10,3 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and - -from . import fleet_base -from .fleet_base import * - -from . import role_maker -from .role_maker import * diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index 044ebcf3ac3..09a1bac85f0 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -16,7 +16,7 @@ from __future__ import print_function import abc -from paddle.fluid.core import CPUPlace +import paddle.fluid as fluid from paddle.fluid.executor import Executor from paddle.fluid.optimizer import SGD @@ -193,7 +193,7 @@ class Fleet(object): Returns: None """ - self._executor = Executor(CPUPlace()) + self._executor = Executor(fluid.CPUPlace()) if role_maker and not isinstance(role_maker, RoleMakerBase): raise TypeError("role_maker must be an instance of RoleMakerBase") diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index 4df18f537e1..bada19abcc3 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -14,9 +14,9 @@ """Defination of Role Makers.""" from __future__ import print_function +import paddle.fluid as fluid import os - -from paddle.fluid.core import Gloo +import time __all__ = [ 'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker', @@ -577,7 +577,7 @@ class GeneralRoleMaker(RoleMakerBase): current_id = int(os.environ["PADDLE_TRAINER_ID"]) self._node_type = 1 self._cur_endpoint = worker_endpoints[current_id] - gloo = Gloo() + gloo = fluid.core.Gloo() gloo.init(current_id, len(worker_endpoints), self._hdfs_path.rstrip("/") + "/trainer", @@ -597,7 +597,7 @@ class GeneralRoleMaker(RoleMakerBase): current_id = eplist.index(cur_endpoint) self._node_type = 0 self._cur_endpoint = cur_endpoint - gloo = Gloo() + gloo = fluid.core.Gloo() gloo.init(current_id, len(eplist), self._hdfs_path.rstrip("/") + "/pserver", @@ -605,7 +605,7 @@ class GeneralRoleMaker(RoleMakerBase): self._prefix) self._node_type_comm = gloo - gloo = Gloo() + gloo = fluid.core.Gloo() all_list = worker_endpoints + eplist gloo.init( all_list.index(self._cur_endpoint), diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 207b64ded91..33ed0ecf10e 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -11,20 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from __future__ import print_function - -from paddle.fluid.incubate.fleet.parameter_server import version -from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig -from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory - -__all__ = ['TrainerRuntimeConfig', 'StrategyFactory', 'fleet'] - -fleet = None - -if version.is_transpiler(): - from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as fleet_transpiler - fleet = fleet_transpiler -else: - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet as fleet_pslib - fleet = fleet_pslib diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 69cbd86e6f3..667ad0a2ed0 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -27,7 +27,7 @@ from paddle.fluid.executor import Executor from paddle.fluid.parallel_executor import ParallelExecutor from paddle.fluid.optimizer import Optimizer -from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig, DistributedMode diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distributed_strategy.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py similarity index 97% rename from python/paddle/fluid/incubate/fleet/parameter_server/distributed_strategy.py rename to python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py index cf6f2a60cbf..92d07c97da4 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distributed_strategy.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py @@ -124,19 +124,10 @@ class TrainerRuntimeConfig(object): return self.display(self.get_communicator_flags()) -class PSLibRuntimeConfig(object): - def __init__(self): - self.runtime_configs = {} - - def get_runtime_configs(self): - return self.runtime_configs - - class DistributedStrategy(object): def __init__(self): self._program_config = DistributeTranspilerConfig() self._trainer_runtime_config = TrainerRuntimeConfig() - self._pslib_runtime_config = PSLibRuntimeConfig() self._server_runtime_config = ServerRuntimeConfig() num_threads = int(os.getenv("CPU_NUM", "1")) @@ -213,12 +204,6 @@ class DistributedStrategy(object): "check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy." ) - def get_pslib_runtime_config(self): - return self._pslib_runtime_config - - def set_pslib_runtime_config(self, config): - self._pslib_runtime_config.runtime_configs = config - def get_server_runtime_config(self): return self._server_runtime_config diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 221ac782944..40337110cfe 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -18,13 +18,11 @@ from .optimizer_factory import * from google.protobuf import text_format import paddle.fluid as fluid from paddle.fluid.framework import Program -from paddle.fluid.optimizer import Optimizer from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker -from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory class PSLib(Fleet): @@ -224,32 +222,7 @@ class PSLib(Fleet): optimizer(DownpourOptimizer): downpour optimizer """ - - if not isinstance(optimizer, Optimizer): - raise ValueError("optimizer must be an instance of Optimizer") - if not fleet._is_initialized: - raise ValueError( - "use fleet.init(role) to initialize the role of current node before optimizer.minimize(loss)" - ) - if strategy: - if isinstance(strategy, dict): - self._strategy = strategy - elif isinstance(strategy, DistributedStrategy): - if isinstance(strategy, AsyncStrategy): - self._strategy = strategy.get_pslib_runtime_config( - ).get_runtime_configs() - else: - raise TypeError( - "In {} mode, strategy must be an instance of AsyncStrategy, or Dict". - format(fleet._mode)) - else: - raise TypeError( - "In {} mode, strategy must be an instance of AsyncStrategy, or Dict". - format(fleet._mode)) - else: - self._strategy = {} - - self._optimizer = DownpourOptimizer(optimizer, self._strategy) + self._optimizer = DownpourOptimizer(optimizer, strategy) return self._optimizer def save_inference_model(self, @@ -444,6 +417,21 @@ class PSLib(Fleet): self._fleet_ptr.clear_model() self._role_maker._barrier_worker() + def clear_model(self): + """ + clear_model() will be called by user. It will clear sparse model. + + Examples: + .. code-block:: python + + fleet.clear_model() + + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.clear_model() + self._role_maker._barrier_worker() + def load_one_table(self, table_id, model_path, **kwargs): """ load pslib model for one table or load params from paddle model diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index 511b8f4ad6c..5afcf0cf2ee 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -13,7 +13,6 @@ """Defination of Server and Worker.""" from . import ps_pb2 as pslib -from six.moves import reduce class Server(object): diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index edab751554a..f3ed3b49959 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -28,7 +28,6 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) -list(APPEND MIXED_DIST_TEST_OPS test_fleet_input_with_psilib) foreach(TEST_OP ${MIXED_DIST_TEST_OPS}) list(REMOVE_ITEM TEST_OPS ${TEST_OP}) endforeach() @@ -283,7 +282,6 @@ if(WITH_DISTRIBUTE) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS}) - py_test_modules(test_fleet_input_with_psilib MODULES test_fleet_input_with_psilib ENVS ${dist_ENVS}) py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS}) py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS}) py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_sync.py b/python/paddle/fluid/tests/unittests/test_communicator_sync.py index d50dc6dae0b..be1f32fb0ae 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_sync.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_sync.py @@ -25,7 +25,7 @@ from paddle.fluid.communicator import Communicator import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory class TestCommunicator(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 7b360b20070..9f6673124c3 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -720,6 +720,33 @@ class TestDatasetWithFetchHandler(unittest.TestCase): except Exception as e: self.assertTrue(False) + def test_fetch_handler(self): + """ + Test Dataset With Fetch Handler. TestCases. + """ + slots_vars, out = self.net() + files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] + dataset = self.get_dataset(slots_vars, files) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + + fh = fluid.executor.FetchHandler(out.name) + fh.help() + + try: + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_handler=fh) + except ImportError as e: + print("warning: we skip trainer_desc_pb2 import problem in windows") + except RuntimeError as e: + error_msg = "dataset is need and should be initialized" + self.assertEqual(error_msg, cpt.get_exception_message(e)) + except Exception as e: + self.assertTrue(False) + class TestDataset2(unittest.TestCase): """ TestCases for Dataset. """ @@ -769,7 +796,6 @@ class TestDataset2(unittest.TestCase): print("warning: no mpi4py") adam = fluid.optimizer.Adam(learning_rate=0.000005) try: - fleet.init() adam = fleet.distributed_optimizer(adam) adam.minimize([fake_cost], [scope]) except AttributeError as e: @@ -832,7 +858,6 @@ class TestDataset2(unittest.TestCase): print("warning: no mpi4py") adam = fluid.optimizer.Adam(learning_rate=0.000005) try: - fleet.init() adam = fleet.distributed_optimizer( adam, strategy={ diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 7a84a368424..32a06188c5f 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -37,7 +37,7 @@ import paddle.fluid as fluid import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] diff --git a/python/paddle/fluid/tests/unittests/test_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_distributed_strategy.py index 01e0c79dfaf..8dbe2f398f2 100644 --- a/python/paddle/fluid/tests/unittests/test_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_distributed_strategy.py @@ -15,7 +15,7 @@ import unittest import paddle.fluid as fluid from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TrainerRuntimeConfig, StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet import paddle.fluid.incubate.fleet.base.role_maker as role_maker import os diff --git a/python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py b/python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py deleted file mode 100644 index b555f83b7f6..00000000000 --- a/python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest -from six.moves import reduce - -import paddle.fluid as fluid -from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig -from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker -from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker -from paddle.fluid.incubate.fleet.base.role_maker import Role -import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet -from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory -from dist_simnet_bow import train_network - - -class FleetPSLibTest(unittest.TestCase): - def test_transpile(self): - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.SERVER, - worker_num=2, - server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) - - optimizer = fluid.optimizer.SGD(0.1) - # case5 - self.assertRaises(Exception, fleet.distributed_optimizer, optimizer, - "Adam") - fleet.init(role) - - avg_cost, _, _ = train_network(128, False, True) - - # case1 - strategy = StrategyFactory.create_async_strategy() - fleet.distributed_optimizer(optimizer, strategy) - - # case2 - strategy = {} - fleet.distributed_optimizer(optimizer, strategy) - - # case3 - self.assertRaises(Exception, fleet.distributed_optimizer, optimizer, - "Adam") - - # case4 - self.assertRaises(Exception, fleet.distributed_optimizer, "Adam", - "Adam") - - -if __name__ == '__main__': - unittest.main() diff --git a/python/setup.py.in b/python/setup.py.in index a7eb2b5f615..d2f4571b115 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -1,7 +1,6 @@ import subprocess import os import os.path -import errno import re import shutil import sys @@ -105,35 +104,6 @@ def mkl(): write_version_py(filename='@PADDLE_BINARY_DIR@/python/paddle/version.py') -def write_distributed_training_mode_py(filename='paddle/fluid/incubate/fleet/parameter_server/version.py'): - cnt = '''from __future__ import print_function - -# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY - -from paddle.fluid.incubate.fleet.base.fleet_base import Mode - -BUILD_MODE=Mode.%(mode)s - -def is_transpiler(): - return Mode.TRANSPILER == BUILD_MODE - -''' - - dirname = os.path.dirname(filename) - - try: - os.makedirs(dirname) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - with open(filename, 'w') as f: - f.write(cnt % { - 'mode': 'PSLIB' if '${WITH_PSLIB}' == 'ON' else 'TRANSPILER' - }) - -write_distributed_training_mode_py(filename='@PADDLE_BINARY_DIR@/python/paddle/fluid/incubate/fleet/parameter_server/version.py') - packages=['paddle', 'paddle.libs', 'paddle.utils', -- GitLab