未验证 提交 66fce9e8 编写于 作者: T tangwei12 提交者: GitHub

Integrated API of Parameter Server (#22710)

Fleet Parameter Server API Integrated
上级 d9d314d5
...@@ -14,4 +14,5 @@ ...@@ -14,4 +14,5 @@
# incubate directory is mainly for internal use # incubate directory is mainly for internal use
# after we have tested incubate APIs in industrial application for a period # after we have tested incubate APIs in industrial application for a period
# we will move stable functions into fluid # we will move stable functions into fluid
__version__ = '0.1.0'
from __future__ import print_function
...@@ -11,4 +11,7 @@ ...@@ -11,4 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
__version__ = '0.1.0' from __future__ import print_function
from . import base
from .base import *
...@@ -10,3 +10,9 @@ ...@@ -10,3 +10,9 @@
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
from . import fleet_base
from .fleet_base import *
from . import role_maker
from .role_maker import *
...@@ -16,7 +16,7 @@ from __future__ import print_function ...@@ -16,7 +16,7 @@ from __future__ import print_function
import abc import abc
import paddle.fluid as fluid from paddle.fluid.core import CPUPlace
from paddle.fluid.executor import Executor from paddle.fluid.executor import Executor
from paddle.fluid.optimizer import SGD from paddle.fluid.optimizer import SGD
...@@ -193,7 +193,7 @@ class Fleet(object): ...@@ -193,7 +193,7 @@ class Fleet(object):
Returns: Returns:
None None
""" """
self._executor = Executor(fluid.CPUPlace()) self._executor = Executor(CPUPlace())
if role_maker and not isinstance(role_maker, RoleMakerBase): if role_maker and not isinstance(role_maker, RoleMakerBase):
raise TypeError("role_maker must be an instance of RoleMakerBase") raise TypeError("role_maker must be an instance of RoleMakerBase")
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
"""Defination of Role Makers.""" """Defination of Role Makers."""
from __future__ import print_function from __future__ import print_function
import paddle.fluid as fluid
import os import os
import time
from paddle.fluid.core import Gloo
__all__ = [ __all__ = [
'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker', 'Role', 'RoleMakerBase', 'MPISymetricRoleMaker', 'UserDefinedRoleMaker',
...@@ -577,7 +577,7 @@ class GeneralRoleMaker(RoleMakerBase): ...@@ -577,7 +577,7 @@ class GeneralRoleMaker(RoleMakerBase):
current_id = int(os.environ["PADDLE_TRAINER_ID"]) current_id = int(os.environ["PADDLE_TRAINER_ID"])
self._node_type = 1 self._node_type = 1
self._cur_endpoint = worker_endpoints[current_id] self._cur_endpoint = worker_endpoints[current_id]
gloo = fluid.core.Gloo() gloo = Gloo()
gloo.init(current_id, gloo.init(current_id,
len(worker_endpoints), len(worker_endpoints),
self._hdfs_path.rstrip("/") + "/trainer", self._hdfs_path.rstrip("/") + "/trainer",
...@@ -597,7 +597,7 @@ class GeneralRoleMaker(RoleMakerBase): ...@@ -597,7 +597,7 @@ class GeneralRoleMaker(RoleMakerBase):
current_id = eplist.index(cur_endpoint) current_id = eplist.index(cur_endpoint)
self._node_type = 0 self._node_type = 0
self._cur_endpoint = cur_endpoint self._cur_endpoint = cur_endpoint
gloo = fluid.core.Gloo() gloo = Gloo()
gloo.init(current_id, gloo.init(current_id,
len(eplist), len(eplist),
self._hdfs_path.rstrip("/") + "/pserver", self._hdfs_path.rstrip("/") + "/pserver",
...@@ -605,7 +605,7 @@ class GeneralRoleMaker(RoleMakerBase): ...@@ -605,7 +605,7 @@ class GeneralRoleMaker(RoleMakerBase):
self._prefix) self._prefix)
self._node_type_comm = gloo self._node_type_comm = gloo
gloo = fluid.core.Gloo() gloo = Gloo()
all_list = worker_endpoints + eplist all_list = worker_endpoints + eplist
gloo.init( gloo.init(
all_list.index(self._cur_endpoint), all_list.index(self._cur_endpoint),
......
...@@ -11,3 +11,20 @@ ...@@ -11,3 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from __future__ import print_function
from paddle.fluid.incubate.fleet.parameter_server import version
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory
__all__ = ['TrainerRuntimeConfig', 'StrategyFactory', 'fleet']
fleet = None
if version.is_transpiler():
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as fleet_transpiler
fleet = fleet_transpiler
else:
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet as fleet_pslib
fleet = fleet_pslib
...@@ -27,7 +27,7 @@ from paddle.fluid.executor import Executor ...@@ -27,7 +27,7 @@ from paddle.fluid.executor import Executor
from paddle.fluid.parallel_executor import ParallelExecutor from paddle.fluid.parallel_executor import ParallelExecutor
from paddle.fluid.optimizer import Optimizer from paddle.fluid.optimizer import Optimizer
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspiler as OriginTranspiler
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig, DistributedMode from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig, DistributedMode
......
...@@ -124,10 +124,19 @@ class TrainerRuntimeConfig(object): ...@@ -124,10 +124,19 @@ class TrainerRuntimeConfig(object):
return self.display(self.get_communicator_flags()) return self.display(self.get_communicator_flags())
class PSLibRuntimeConfig(object):
def __init__(self):
self.runtime_configs = {}
def get_runtime_configs(self):
return self.runtime_configs
class DistributedStrategy(object): class DistributedStrategy(object):
def __init__(self): def __init__(self):
self._program_config = DistributeTranspilerConfig() self._program_config = DistributeTranspilerConfig()
self._trainer_runtime_config = TrainerRuntimeConfig() self._trainer_runtime_config = TrainerRuntimeConfig()
self._pslib_runtime_config = PSLibRuntimeConfig()
self._server_runtime_config = ServerRuntimeConfig() self._server_runtime_config = ServerRuntimeConfig()
num_threads = int(os.getenv("CPU_NUM", "1")) num_threads = int(os.getenv("CPU_NUM", "1"))
...@@ -204,6 +213,12 @@ class DistributedStrategy(object): ...@@ -204,6 +213,12 @@ class DistributedStrategy(object):
"check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy." "check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
) )
def get_pslib_runtime_config(self):
return self._pslib_runtime_config
def set_pslib_runtime_config(self, config):
self._pslib_runtime_config.runtime_configs = config
def get_server_runtime_config(self): def get_server_runtime_config(self):
return self._server_runtime_config return self._server_runtime_config
......
...@@ -18,11 +18,13 @@ from .optimizer_factory import * ...@@ -18,11 +18,13 @@ from .optimizer_factory import *
from google.protobuf import text_format from google.protobuf import text_format
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.framework import Program from paddle.fluid.framework import Program
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.fleet_base import Mode from paddle.fluid.incubate.fleet.base.fleet_base import Mode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import TrainerRuntimeConfig, DistributedStrategy, SyncStrategy, AsyncStrategy, HalfAsyncStrategy, GeoStrategy, StrategyFactory
class PSLib(Fleet): class PSLib(Fleet):
...@@ -222,7 +224,32 @@ class PSLib(Fleet): ...@@ -222,7 +224,32 @@ class PSLib(Fleet):
optimizer(DownpourOptimizer): downpour optimizer optimizer(DownpourOptimizer): downpour optimizer
""" """
self._optimizer = DownpourOptimizer(optimizer, strategy)
if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")
if not fleet._is_initialized:
raise ValueError(
"use fleet.init(role) to initialize the role of current node before optimizer.minimize(loss)"
)
if strategy:
if isinstance(strategy, dict):
self._strategy = strategy
elif isinstance(strategy, DistributedStrategy):
if isinstance(strategy, AsyncStrategy):
self._strategy = strategy.get_pslib_runtime_config(
).get_runtime_configs()
else:
raise TypeError(
"In {} mode, strategy must be an instance of AsyncStrategy, or Dict".
format(fleet._mode))
else:
raise TypeError(
"In {} mode, strategy must be an instance of AsyncStrategy, or Dict".
format(fleet._mode))
else:
self._strategy = {}
self._optimizer = DownpourOptimizer(optimizer, self._strategy)
return self._optimizer return self._optimizer
def save_inference_model(self, def save_inference_model(self,
...@@ -417,21 +444,6 @@ class PSLib(Fleet): ...@@ -417,21 +444,6 @@ class PSLib(Fleet):
self._fleet_ptr.clear_model() self._fleet_ptr.clear_model()
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
def clear_model(self):
"""
clear_model() will be called by user. It will clear sparse model.
Examples:
.. code-block:: python
fleet.clear_model()
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.clear_model()
self._role_maker._barrier_worker()
def load_one_table(self, table_id, model_path, **kwargs): def load_one_table(self, table_id, model_path, **kwargs):
""" """
load pslib model for one table or load params from paddle model load pslib model for one table or load params from paddle model
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
"""Defination of Server and Worker.""" """Defination of Server and Worker."""
from . import ps_pb2 as pslib from . import ps_pb2 as pslib
from six.moves import reduce
class Server(object): class Server(object):
......
...@@ -28,6 +28,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo) ...@@ -28,6 +28,7 @@ list(APPEND MIXED_DIST_TEST_OPS test_communicator_geo)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async) list(APPEND MIXED_DIST_TEST_OPS test_communicator_half_async)
list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync) list(APPEND MIXED_DIST_TEST_OPS test_communicator_sync)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_input_with_psilib)
foreach(TEST_OP ${MIXED_DIST_TEST_OPS}) foreach(TEST_OP ${MIXED_DIST_TEST_OPS})
list(REMOVE_ITEM TEST_OPS ${TEST_OP}) list(REMOVE_ITEM TEST_OPS ${TEST_OP})
endforeach() endforeach()
...@@ -282,6 +283,7 @@ if(WITH_DISTRIBUTE) ...@@ -282,6 +283,7 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS})
py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS})
py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS}) py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS})
py_test_modules(test_fleet_input_with_psilib MODULES test_fleet_input_with_psilib ENVS ${dist_ENVS})
py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS}) py_test_modules(test_communicator_async MODULES test_communicator_async ENVS ${dist_ENVS})
py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS}) py_test_modules(test_communicator_geo MODULES test_communicator_geo ENVS ${dist_ENVS})
py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1) py_test_modules(test_communicator_half_async MODULES test_communicator_half_async ENVS ${dist_ENVS} FLAGS_communicator_send_queue_size=1 FLAGS_communicator_max_merge_var_num=1)
......
...@@ -25,7 +25,7 @@ from paddle.fluid.communicator import Communicator ...@@ -25,7 +25,7 @@ from paddle.fluid.communicator import Communicator
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory
class TestCommunicator(unittest.TestCase): class TestCommunicator(unittest.TestCase):
......
...@@ -720,33 +720,6 @@ class TestDatasetWithFetchHandler(unittest.TestCase): ...@@ -720,33 +720,6 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
def test_fetch_handler(self):
"""
Test Dataset With Fetch Handler. TestCases.
"""
slots_vars, out = self.net()
files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
dataset = self.get_dataset(slots_vars, files)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
fh = fluid.executor.FetchHandler(out.name)
fh.help()
try:
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_handler=fh)
except ImportError as e:
print("warning: we skip trainer_desc_pb2 import problem in windows")
except RuntimeError as e:
error_msg = "dataset is need and should be initialized"
self.assertEqual(error_msg, cpt.get_exception_message(e))
except Exception as e:
self.assertTrue(False)
class TestDataset2(unittest.TestCase): class TestDataset2(unittest.TestCase):
""" TestCases for Dataset. """ """ TestCases for Dataset. """
...@@ -796,6 +769,7 @@ class TestDataset2(unittest.TestCase): ...@@ -796,6 +769,7 @@ class TestDataset2(unittest.TestCase):
print("warning: no mpi4py") print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005) adam = fluid.optimizer.Adam(learning_rate=0.000005)
try: try:
fleet.init()
adam = fleet.distributed_optimizer(adam) adam = fleet.distributed_optimizer(adam)
adam.minimize([fake_cost], [scope]) adam.minimize([fake_cost], [scope])
except AttributeError as e: except AttributeError as e:
...@@ -858,6 +832,7 @@ class TestDataset2(unittest.TestCase): ...@@ -858,6 +832,7 @@ class TestDataset2(unittest.TestCase):
print("warning: no mpi4py") print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005) adam = fluid.optimizer.Adam(learning_rate=0.000005)
try: try:
fleet.init()
adam = fleet.distributed_optimizer( adam = fleet.distributed_optimizer(
adam, adam,
strategy={ strategy={
......
...@@ -37,7 +37,7 @@ import paddle.fluid as fluid ...@@ -37,7 +37,7 @@ import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import StrategyFactory
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import unittest import unittest
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig, ServerRuntimeConfig
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig, StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import TrainerRuntimeConfig, StrategyFactory
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import os import os
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
from six.moves import reduce
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import UserDefinedCollectiveRoleMaker
from paddle.fluid.incubate.fleet.base.role_maker import Role
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.distributed_strategy import StrategyFactory
from dist_simnet_bow import train_network
class FleetPSLibTest(unittest.TestCase):
def test_transpile(self):
role = role_maker.UserDefinedRoleMaker(
current_id=0,
role=role_maker.Role.SERVER,
worker_num=2,
server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"])
optimizer = fluid.optimizer.SGD(0.1)
# case5
self.assertRaises(Exception, fleet.distributed_optimizer, optimizer,
"Adam")
fleet.init(role)
avg_cost, _, _ = train_network(128, False, True)
# case1
strategy = StrategyFactory.create_async_strategy()
fleet.distributed_optimizer(optimizer, strategy)
# case2
strategy = {}
fleet.distributed_optimizer(optimizer, strategy)
# case3
self.assertRaises(Exception, fleet.distributed_optimizer, optimizer,
"Adam")
# case4
self.assertRaises(Exception, fleet.distributed_optimizer, "Adam",
"Adam")
if __name__ == '__main__':
unittest.main()
import subprocess import subprocess
import os import os
import os.path import os.path
import errno
import re import re
import shutil import shutil
import sys import sys
...@@ -104,6 +105,35 @@ def mkl(): ...@@ -104,6 +105,35 @@ def mkl():
write_version_py(filename='@PADDLE_BINARY_DIR@/python/paddle/version.py') write_version_py(filename='@PADDLE_BINARY_DIR@/python/paddle/version.py')
def write_distributed_training_mode_py(filename='paddle/fluid/incubate/fleet/parameter_server/version.py'):
cnt = '''from __future__ import print_function
# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY
from paddle.fluid.incubate.fleet.base.fleet_base import Mode
BUILD_MODE=Mode.%(mode)s
def is_transpiler():
return Mode.TRANSPILER == BUILD_MODE
'''
dirname = os.path.dirname(filename)
try:
os.makedirs(dirname)
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open(filename, 'w') as f:
f.write(cnt % {
'mode': 'PSLIB' if '${WITH_PSLIB}' == 'ON' else 'TRANSPILER'
})
write_distributed_training_mode_py(filename='@PADDLE_BINARY_DIR@/python/paddle/fluid/incubate/fleet/parameter_server/version.py')
packages=['paddle', packages=['paddle',
'paddle.libs', 'paddle.libs',
'paddle.utils', 'paddle.utils',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册