未验证 提交 c4a6a0e2 编写于 作者: T tangwei12 提交者: GitHub

Revert "Integrated API of Parameter Server (#22710)" test=develop (#23071)

This reverts commit 66fce9e8.
上级 880eb04d
......@@ -14,5 +14,4 @@
# incubate directory is mainly for internal use
# after we have tested incubate APIs in industrial application for a period
# we will move stable functions into fluid
from __future__ import print_function
__version__ = '0.1.0'
......@@ -11,7 +11,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from __future__ import print_function
from . import base
from .base import *
__version__ = '0.1.0'
......@@ -10,9 +10,3 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from . import fleet_base
from .fleet_base import *
from . import role_maker
from .role_maker import *
......@@ -16,7 +16,7 @@ from __future__ import print_function
import abc
from paddle.fluid.core import CPUPlace
import paddle.fluid as fluid
from paddle.fluid.executor import Executor
from paddle.fluid.optimizer import SGD
......@@ -193,7 +193,7 @@ class Fleet(object):
Returns:
None
"""
self._executor = Executor(CPUPlace())
self._executor = Executor(fluid.CPUPlace())
if role_maker and not isinstance(role_maker, RoleMakerBase):
raise TypeError("role_maker must be an instance of RoleMakerBase")
......
......@@ -14,9 +14,9 @@
"""Defination of Role Makers."""
from __future__ import print_function
import paddle.fluid as fluid
import os
from paddle.fluid.core import Gloo
import time
__all__ = [
'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker',
......@@ -577,7 +577,7 @@ class GeneralRoleMaker(RoleMakerBase):
current_id = int(os.environ["PADDLE_TRAINER_ID"])
self._node_type = 1
self._cur_endpoint = worker_endpoints[current_id]
gloo = Gloo()
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(worker_endpoints),
self._hdfs_path.rstrip("/") + "/trainer",
......@@ -597,7 +597,7 @@ class GeneralRoleMaker(RoleMakerBase):
current_id = eplist.index(cur_endpoint)
self._node_type = 0
self._cur_endpoint = cur_endpoint
gloo = Gloo()
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(eplist),
self._hdfs_path.rstrip("/") + "/pserver",
......@@ -605,7 +605,7 @@ class GeneralRoleMaker(RoleMakerBase):
self._prefix)
self._node_type_comm = gloo
gloo = Gloo()
gloo = fluid.core.Gloo()
all_list = worker_endpoints + eplist
gloo.init(
all_list.index(self._cur_endpoint),
......
......@@ -11,20 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddle.fluid.incubate.fleet.parameter_server import version
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory
__all__ = ['TrainerRuntimeConfig', 'StrategyFactory', 'fleet']
fleet = None
if version.is_transpiler():
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as fleet_transpiler
fleet = fleet_transpiler
else:
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet as fleet_pslib
fleet = fleet_pslib
......@@ -27,7 +27,7 @@ from paddle.fluid.executor import Executor
from paddle.fluid.parallel_executor import ParallelExecutor
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig, DistributedMode
......
......@@ -124,19 +124,10 @@ class TrainerRuntimeConfig(object):
return self.display(self.get_communicator_flags())
class PSLibRuntimeConfig(object):
def __init__(self):
self.runtime_configs = {}
def get_runtime_configs(self):
return self.runtime_configs
class DistributedStrategy(object):
def __init__(self):
self._program_config = DistributeTranspilerConfig()
self._trainer_runtime_config = TrainerRuntimeConfig()
self._pslib_runtime_config = PSLibRuntimeConfig()
self._server_runtime_config = ServerRuntimeConfig()
num_threads = int(os.getenv("CPU_NUM", "1"))
......@@ -213,12 +204,6 @@ class DistributedStrategy(object):
"check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_pslib_runtime_config(self):
return self._pslib_runtime_config
def set_pslib_runtime_config(self, config):
self._pslib_runtime_config.runtime_configs = config
def get_server_runtime_config(self):
return self._server_runtime_config
......
......@@ -18,13 +18,11 @@ from .optimizer_factory import *
from google.protobuf import text_format
import paddle.fluid as fluid
from paddle.fluid.framework import Program
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.fleet_base import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory
class PSLib(Fleet):
......@@ -224,32 +222,7 @@ class PSLib(Fleet):
optimizer(DownpourOptimizer): downpour optimizer
"""
if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")
if not fleet._is_initialized:
raise ValueError(
"use fleet.init(role) to initialize the role of current node before optimizer.minimize(loss)"
)
if strategy:
if isinstance(strategy, dict):
self._strategy = strategy
elif isinstance(strategy, DistributedStrategy):
if isinstance(strategy, AsyncStrategy):
self._strategy = strategy.get_pslib_runtime_config(
).get_runtime_configs()
else:
raise TypeError(
"In {} mode, strategy must be an instance of AsyncStrategy, or Dict".
format(fleet._mode))
else:
raise TypeError(
"In {} mode, strategy must be an instance of AsyncStrategy, or Dict".
format(fleet._mode))
else:
self._strategy = {}
self._optimizer = DownpourOptimizer(optimizer, self._strategy)
self._optimizer = DownpourOptimizer(optimizer, strategy)
return self._optimizer
def save_inference_model(self,
......@@ -444,6 +417,21 @@ class PSLib(Fleet):
self._fleet_ptr.clear_model()
self._role_maker._barrier_worker()
def clear_model(self):
"""
clear_model() will be called by user. It will clear sparse model.
Examples:
.. code-block:: python
fleet.clear_model()
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.clear_model()
self._role_maker._barrier_worker()
def load_one_table(self, table_id, model_path, **kwargs):
"""
load pslib model for one table or load params from paddle model
......
......@@ -13,7 +13,6 @@
"""Defination of Server and Worker."""
from . import ps_pb2 as pslib
from six.moves import reduce
class Server(object):
......
......@@ -28,7 +28,6 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_input_with_psilib)
foreach(TEST_OP ${MIXED_DIST_TEST_OPS})
list(REMOVE_ITEM TEST_OPS ${TEST_OP})
endforeach()
......@@ -283,7 +282,6 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS})
py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS})
py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS})
py_test_modules(test_fleet_input_with_psilib MODULES test_fleet_input_with_psilib ENVS ${dist_ENVS})
py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS})
py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS})
py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1)
......
......@@ -25,7 +25,7 @@ from paddle.fluid.communicator import Communicator
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
class TestCommunicator(unittest.TestCase):
......
......@@ -720,6 +720,33 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
except Exception as e:
self.assertTrue(False)
def test_fetch_handler(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
fh = fluid.executor.FetchHandler(out.name)
fh.help()
try:
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_handler=fh)
except ImportError as e:
print("warning: we skip trainer_desc_pb2 import problem in windows")
except RuntimeError as e:
error_msg = "dataset is need and should be initialized"
self.assertEqual(error_msg, cpt.get_exception_message(e))
except Exception as e:
self.assertTrue(False)
class TestDataset2(unittest.TestCase):
""" TestCases for Dataset. """
......@@ -769,7 +796,6 @@ class TestDataset2(unittest.TestCase):
print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005)
try:
fleet.init()
adam = fleet.distributed_optimizer(adam)
adam.minimize([fake_cost], [scope])
except AttributeError as e:
......@@ -832,7 +858,6 @@ class TestDataset2(unittest.TestCase):
print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005)
try:
fleet.init()
adam = fleet.distributed_optimizer(
adam,
strategy={
......
......@@ -37,7 +37,7 @@ import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']
......
......@@ -15,7 +15,7 @@
import unittest
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TrainerRuntimeConfig, StrategyFactory
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, StrategyFactory
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import os
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
from six.moves import reduce
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import Role
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory
from dist_simnet_bow import train_network
class FleetPSLibTest(unittest.TestCase):
def test_transpile(self):
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
optimizer = fluid.optimizer.SGD(0.1)
# case5
self.assertRaises(Exception, fleet.distributed_optimizer, optimizer,
"Adam")
fleet.init(role)
avg_cost, _, _ = train_network(128, False, True)
# case1
strategy = StrategyFactory.create_async_strategy()
fleet.distributed_optimizer(optimizer, strategy)
# case2
strategy = {}
fleet.distributed_optimizer(optimizer, strategy)
# case3
self.assertRaises(Exception, fleet.distributed_optimizer, optimizer,
"Adam")
# case4
self.assertRaises(Exception, fleet.distributed_optimizer, "Adam",
"Adam")
if __name__ == '__main__':
unittest.main()
import subprocess
import os
import os.path
import errno
import re
import shutil
import sys
......@@ -105,35 +104,6 @@ def mkl():
write_version_py(filename='@PADDLE_BINARY_DIR@/python/paddle/version.py')
def write_distributed_training_mode_py(filename='paddle/fluid/incubate/fleet/parameter_server/version.py'):
cnt = '''from __future__ import print_function
# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY
from paddle.fluid.incubate.fleet.base.fleet_base import Mode
BUILD_MODE=Mode.%(mode)s
def is_transpiler():
return Mode.TRANSPILER == BUILD_MODE
'''
dirname = os.path.dirname(filename)
try:
os.makedirs(dirname)
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open(filename, 'w') as f:
f.write(cnt % {
'mode': 'PSLIB' if '${WITH_PSLIB}' == 'ON' else 'TRANSPILER'
})
write_distributed_training_mode_py(filename='@PADDLE_BINARY_DIR@/python/paddle/fluid/incubate/fleet/parameter_server/version.py')
packages=['paddle',
'paddle.libs',
'paddle.utils',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册