diff --git a/python/paddle/fluid/incubate/__init__.py b/python/paddle/fluid/incubate/__init__.py index 76c5c6391fde3cafbd9a94e1d11e0ef4401420ed..4f5a31b5fbab564973549eba9e1a27986be55aec 100644 --- a/python/paddle/fluid/incubate/__init__.py +++ b/python/paddle/fluid/incubate/__init__.py @@ -14,4 +14,5 @@ # incubate directory is mainly for internal use # after we have tested incubate APIs in industrial application for a period # we will move stable functions into fluid -__version__ = '0.1.0' + +from __future__ import print_function diff --git a/python/paddle/fluid/incubate/fleet/__init__.py b/python/paddle/fluid/incubate/fleet/__init__.py index a05baabca392b14a4cb09a3f395ae7687d8a5e62..2b4bd2dbd9d6a3da9b92cd45a90d7c69f12af312 100644 --- a/python/paddle/fluid/incubate/fleet/__init__.py +++ b/python/paddle/fluid/incubate/fleet/__init__.py @@ -11,4 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -__version__ = '0.1.0' +from __future__ import print_function + +from . import base +from .base import * diff --git a/python/paddle/fluid/incubate/fleet/base/__init__.py b/python/paddle/fluid/incubate/fleet/base/__init__.py index 8647330f3290f3142cabca9a7e3fe162a9838dda..13981773185fb0e55e010b9ea6996b2388e462d3 100644 --- a/python/paddle/fluid/incubate/fleet/base/__init__.py +++ b/python/paddle/fluid/incubate/fleet/base/__init__.py @@ -10,3 +10,9 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and + +from . import fleet_base +from .fleet_base import * + +from . import role_maker +from .role_maker import * diff --git a/python/paddle/fluid/incubate/fleet/base/fleet_base.py b/python/paddle/fluid/incubate/fleet/base/fleet_base.py index 09a1bac85f04ae61ea12d0beac52619f60ff9d9f..044ebcf3ac3effbd532206b1f0c65b7bfc1a6b16 100644 --- a/python/paddle/fluid/incubate/fleet/base/fleet_base.py +++ b/python/paddle/fluid/incubate/fleet/base/fleet_base.py @@ -16,7 +16,7 @@ from __future__ import print_function import abc -import paddle.fluid as fluid +from paddle.fluid.core import CPUPlace from paddle.fluid.executor import Executor from paddle.fluid.optimizer import SGD @@ -193,7 +193,7 @@ class Fleet(object): Returns: None """ - self._executor = Executor(fluid.CPUPlace()) + self._executor = Executor(CPUPlace()) if role_maker and not isinstance(role_maker, RoleMakerBase): raise TypeError("role_maker must be an instance of RoleMakerBase") diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index bada19abcc32d2bc91d99203a7b66389aca912d3..4df18f537e100e566bf1020b1d735c411141c4d3 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -14,9 +14,9 @@ """Defination of Role Makers.""" from __future__ import print_function -import paddle.fluid as fluid import os -import time + +from paddle.fluid.core import Gloo __all__ = [ 'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker', @@ -577,7 +577,7 @@ class GeneralRoleMaker(RoleMakerBase): current_id = int(os.environ["PADDLE_TRAINER_ID"]) self._node_type = 1 self._cur_endpoint = worker_endpoints[current_id] - gloo = fluid.core.Gloo() + gloo = Gloo() gloo.init(current_id, len(worker_endpoints), self._hdfs_path.rstrip("/") + "/trainer", @@ -597,7 +597,7 @@ class GeneralRoleMaker(RoleMakerBase): current_id = eplist.index(cur_endpoint) self._node_type = 0 self._cur_endpoint = cur_endpoint - gloo = fluid.core.Gloo() + gloo = Gloo() gloo.init(current_id, len(eplist), self._hdfs_path.rstrip("/") + "/pserver", @@ -605,7 +605,7 @@ class GeneralRoleMaker(RoleMakerBase): self._prefix) self._node_type_comm = gloo - gloo = fluid.core.Gloo() + gloo = Gloo() all_list = worker_endpoints + eplist gloo.init( all_list.index(self._cur_endpoint), diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py index 33ed0ecf10ec4cad807ebb6df1590de65eeeab1e..207b64ded919dfe64295c300e37ed52ff8620535 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/__init__.py @@ -11,3 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import print_function + +from paddle.fluid.incubate.fleet.parameter_server import version +from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig +from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory + +__all__ = ['TrainerRuntimeConfig', 'StrategyFactory', 'fleet'] + +fleet = None + +if version.is_transpiler(): + from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as fleet_transpiler + fleet = fleet_transpiler +else: + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet as fleet_pslib + fleet = fleet_pslib diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 667ad0a2ed014c87042dd3dfe7885b2670e1c764..69cbd86e6f33733bc6c9a012b4f69ca71c9c8922 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -27,7 +27,7 @@ from paddle.fluid.executor import Executor from paddle.fluid.parallel_executor import ParallelExecutor from paddle.fluid.optimizer import Optimizer -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig, DistributedMode diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py b/python/paddle/fluid/incubate/fleet/parameter_server/distributed_strategy.py similarity index 97% rename from python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py rename to python/paddle/fluid/incubate/fleet/parameter_server/distributed_strategy.py index 92d07c97da46568f31d86a99f20f0b8fe071b031..cf6f2a60cbfb14b36e0ecd07680cf68294080e28 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distributed_strategy.py @@ -124,10 +124,19 @@ class TrainerRuntimeConfig(object): return self.display(self.get_communicator_flags()) +class PSLibRuntimeConfig(object): + def __init__(self): + self.runtime_configs = {} + + def get_runtime_configs(self): + return self.runtime_configs + + class DistributedStrategy(object): def __init__(self): self._program_config = DistributeTranspilerConfig() self._trainer_runtime_config = TrainerRuntimeConfig() + self._pslib_runtime_config = PSLibRuntimeConfig() self._server_runtime_config = ServerRuntimeConfig() num_threads = int(os.getenv("CPU_NUM", "1")) @@ -204,6 +213,12 @@ class DistributedStrategy(object): "check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy." ) + def get_pslib_runtime_config(self): + return self._pslib_runtime_config + + def set_pslib_runtime_config(self, config): + self._pslib_runtime_config.runtime_configs = config + def get_server_runtime_config(self): return self._server_runtime_config diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 40337110cfe966511050b78e3e463e7653c3caba..221ac782944f8c162e1cfe73e12b3437db3b0a50 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -18,11 +18,13 @@ from .optimizer_factory import * from google.protobuf import text_format import paddle.fluid as fluid from paddle.fluid.framework import Program +from paddle.fluid.optimizer import Optimizer from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker +from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory class PSLib(Fleet): @@ -222,7 +224,32 @@ class PSLib(Fleet): optimizer(DownpourOptimizer): downpour optimizer """ - self._optimizer = DownpourOptimizer(optimizer, strategy) + + if not isinstance(optimizer, Optimizer): + raise ValueError("optimizer must be an instance of Optimizer") + if not fleet._is_initialized: + raise ValueError( + "use fleet.init(role) to initialize the role of current node before optimizer.minimize(loss)" + ) + if strategy: + if isinstance(strategy, dict): + self._strategy = strategy + elif isinstance(strategy, DistributedStrategy): + if isinstance(strategy, AsyncStrategy): + self._strategy = strategy.get_pslib_runtime_config( + ).get_runtime_configs() + else: + raise TypeError( + "In {} mode, strategy must be an instance of AsyncStrategy, or Dict". + format(fleet._mode)) + else: + raise TypeError( + "In {} mode, strategy must be an instance of AsyncStrategy, or Dict". + format(fleet._mode)) + else: + self._strategy = {} + + self._optimizer = DownpourOptimizer(optimizer, self._strategy) return self._optimizer def save_inference_model(self, @@ -417,21 +444,6 @@ class PSLib(Fleet): self._fleet_ptr.clear_model() self._role_maker._barrier_worker() - def clear_model(self): - """ - clear_model() will be called by user. It will clear sparse model. - - Examples: - .. code-block:: python - - fleet.clear_model() - - """ - self._role_maker._barrier_worker() - if self._role_maker.is_first_worker(): - self._fleet_ptr.clear_model() - self._role_maker._barrier_worker() - def load_one_table(self, table_id, model_path, **kwargs): """ load pslib model for one table or load params from paddle model diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py index 5afcf0cf2ee032c3d510ecf893256c9f8754156b..511b8f4ad6c99a1c27c1f7eef60b313d39f1a150 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py @@ -13,6 +13,7 @@ """Defination of Server and Worker.""" from . import ps_pb2 as pslib +from six.moves import reduce class Server(object): diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index f3ed3b499595d7e857d2073c14cf17190f59f068..edab751554a0c137ed57cb24a2a60ea8aa93e3b5 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -28,6 +28,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) +list(APPEND MIXED_DIST_TEST_OPS test_fleet_input_with_psilib) foreach(TEST_OP ${MIXED_DIST_TEST_OPS}) list(REMOVE_ITEM TEST_OPS ${TEST_OP}) endforeach() @@ -282,6 +283,7 @@ if(WITH_DISTRIBUTE) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS}) + py_test_modules(test_fleet_input_with_psilib MODULES test_fleet_input_with_psilib ENVS ${dist_ENVS}) py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS}) py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS}) py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_sync.py b/python/paddle/fluid/tests/unittests/test_communicator_sync.py index be1f32fb0aee10fa08f6609eeda06579145aeb26..d50dc6dae0b839c0958cd22a07097696e1f2fb68 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_sync.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_sync.py @@ -25,7 +25,7 @@ from paddle.fluid.communicator import Communicator import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory class TestCommunicator(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 9f6673124c338e962a4103314452c842a3e91bf7..7b360b20070f06cd60480498626786d1de10d681 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -720,33 +720,6 @@ class TestDatasetWithFetchHandler(unittest.TestCase): except Exception as e: self.assertTrue(False) - def test_fetch_handler(self): - """ - Test Dataset With Fetch Handler. TestCases. - """ - slots_vars, out = self.net() - files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"] - dataset = self.get_dataset(slots_vars, files) - - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(fluid.default_startup_program()) - - fh = fluid.executor.FetchHandler(out.name) - fh.help() - - try: - exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=dataset, - fetch_handler=fh) - except ImportError as e: - print("warning: we skip trainer_desc_pb2 import problem in windows") - except RuntimeError as e: - error_msg = "dataset is need and should be initialized" - self.assertEqual(error_msg, cpt.get_exception_message(e)) - except Exception as e: - self.assertTrue(False) - class TestDataset2(unittest.TestCase): """ TestCases for Dataset. """ @@ -796,6 +769,7 @@ class TestDataset2(unittest.TestCase): print("warning: no mpi4py") adam = fluid.optimizer.Adam(learning_rate=0.000005) try: + fleet.init() adam = fleet.distributed_optimizer(adam) adam.minimize([fake_cost], [scope]) except AttributeError as e: @@ -858,6 +832,7 @@ class TestDataset2(unittest.TestCase): print("warning: no mpi4py") adam = fluid.optimizer.Adam(learning_rate=0.000005) try: + fleet.init() adam = fleet.distributed_optimizer( adam, strategy={ diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 32a06188c5f56306b4aa2ad4c80fb0fac2cad350..7a84a368424abe1a35e3ff192c020b3155523773 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -37,7 +37,7 @@ import paddle.fluid as fluid import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] diff --git a/python/paddle/fluid/tests/unittests/test_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_distributed_strategy.py index 8dbe2f398f210b43454ae6a984650bd9f7c5dc43..01e0c79dfaf0f7f38f82951c78e3e66d8515abd4 100644 --- a/python/paddle/fluid/tests/unittests/test_distributed_strategy.py +++ b/python/paddle/fluid/tests/unittests/test_distributed_strategy.py @@ -15,7 +15,7 @@ import unittest import paddle.fluid as fluid from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, StrategyFactory +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TrainerRuntimeConfig, StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet import paddle.fluid.incubate.fleet.base.role_maker as role_maker import os diff --git a/python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py b/python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py new file mode 100644 index 0000000000000000000000000000000000000000..b555f83b7f61b9cb736a9a06e0183341460a2439 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_input_with_psilib.py @@ -0,0 +1,65 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +from six.moves import reduce + +import paddle.fluid as fluid +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig +from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker +from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker +from paddle.fluid.incubate.fleet.base.role_maker import Role +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory +from dist_simnet_bow import train_network + + +class FleetPSLibTest(unittest.TestCase): + def test_transpile(self): + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.SERVER, + worker_num=2, + server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) + + optimizer = fluid.optimizer.SGD(0.1) + # case5 + self.assertRaises(Exception, fleet.distributed_optimizer, optimizer, + "Adam") + fleet.init(role) + + avg_cost, _, _ = train_network(128, False, True) + + # case1 + strategy = StrategyFactory.create_async_strategy() + fleet.distributed_optimizer(optimizer, strategy) + + # case2 + strategy = {} + fleet.distributed_optimizer(optimizer, strategy) + + # case3 + self.assertRaises(Exception, fleet.distributed_optimizer, optimizer, + "Adam") + + # case4 + self.assertRaises(Exception, fleet.distributed_optimizer, "Adam", + "Adam") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/setup.py.in b/python/setup.py.in index d2f4571b115df254f14ef503e0482cce19f48dcc..a7eb2b5f615ad8925100e643879b4e20f95058d3 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -1,6 +1,7 @@ import subprocess import os import os.path +import errno import re import shutil import sys @@ -104,6 +105,35 @@ def mkl(): write_version_py(filename='@PADDLE_BINARY_DIR@/python/paddle/version.py') +def write_distributed_training_mode_py(filename='paddle/fluid/incubate/fleet/parameter_server/version.py'): + cnt = '''from __future__ import print_function + +# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY + +from paddle.fluid.incubate.fleet.base.fleet_base import Mode + +BUILD_MODE=Mode.%(mode)s + +def is_transpiler(): + return Mode.TRANSPILER == BUILD_MODE + +''' + + dirname = os.path.dirname(filename) + + try: + os.makedirs(dirname) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + with open(filename, 'w') as f: + f.write(cnt % { + 'mode': 'PSLIB' if '${WITH_PSLIB}' == 'ON' else 'TRANSPILER' + }) + +write_distributed_training_mode_py(filename='@PADDLE_BINARY_DIR@/python/paddle/fluid/incubate/fleet/parameter_server/version.py') + packages=['paddle', 'paddle.libs', 'paddle.utils',