未验证 提交 81113b53 编写于 作者: W wangzhen38 提交者: GitHub

[mv fluid] ps related (#50376)

上级 cd54cfab
......@@ -74,7 +74,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
}
def _get_distributed_strategy(self):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
......
......@@ -51,7 +51,7 @@ class ParameterServerRuntime(RuntimeBase):
def _get_distributed_strategy(self):
strategy = None
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
......@@ -239,14 +239,14 @@ class ParameterServerRuntime(RuntimeBase):
kwargs["sparse_attrs"] = get_sparse_attrs()
return kwargs
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
GeoStrategy,
SyncStrategy,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
_get_lr_ops,
_has_global_step,
)
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
GeoStrategy,
SyncStrategy,
)
trainer_config = self.async_strategy.get_trainer_runtime_config()
print(trainer_config)
......
......@@ -692,7 +692,7 @@ class TheOnePSRuntime(RuntimeBase):
def _get_distributed_strategy(self):
strategy = None
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
......@@ -731,7 +731,7 @@ class TheOnePSRuntime(RuntimeBase):
return compiled_config
def _init_worker(self):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
SyncStrategy,
)
......
......@@ -19,10 +19,10 @@ import time
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
......
......@@ -63,7 +63,7 @@ class FleetUtil:
fleet = fleet_pslib
elif mode == "transpiler":
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler import (
fleet as fleet_transpiler,
)
......
......@@ -18,16 +18,14 @@ import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
ServerRuntimeConfig,
)
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
class TestStrategyFactor(unittest.TestCase):
......
......@@ -16,9 +16,7 @@ import logging
# import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.distributed.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
......
......@@ -18,9 +18,7 @@ from utils import gen_data
import paddle
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.base import role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
input_x = paddle.static.data(name="x", shape=[-1, 32], dtype='float32')
input_y = paddle.static.data(name="y", shape=[-1, 1], dtype='int64')
......
......@@ -233,7 +233,7 @@ def get_user_defined_strategy(config):
def get_distributed_strategy(user_defined_strategy): # pslib
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
......
......@@ -1113,7 +1113,7 @@ class TestDataset2(unittest.TestCase):
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
......
......@@ -25,13 +25,11 @@ from paddle.fluid.incubate.fleet.base.role_maker import (
UserDefinedRoleMaker,
)
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer
from paddle.fluid.incubate.fleet.parameter_server import TranspilerOptimizer
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
from paddle.incubate.fleet.parameter_server import TranspilerOptimizer
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
class DistributeTranspilerConfigTest(unittest.TestCase):
......
......@@ -17,10 +17,8 @@ import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
StrategyFactory,
)
......
......@@ -39,7 +39,7 @@ class TestCloudRoleMaker2(unittest.TestCase):
GeneralRoleMaker,
RoleMakerBase,
)
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
from paddle.incubate.fleet.parameter_server.distribute_transpiler import (
fleet,
)
......
......@@ -169,7 +169,7 @@ class DistributeTranspilerConfig:
We can use bandwidth efficiently when data size is larger than 2MB.If you
want to change it, please be sure you have read the slice_variable function. You can find
the definition of slice_variable in
https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/fluid/transpiler/distribute_transpiler.py
https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/transpiler/distribute_transpiler.py
.
Examples:
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
"TrainerRuntimeConfig",
"DistributedStrategy",
"SyncStrategy",
"AsyncStrategy",
"HalfAsyncStrategy",
"GeoStrategy",
"StrategyFactory",
]
import os
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
ServerRuntimeConfig,
)
class TrainerRuntimeConfig:
def __init__(self):
self.mode = None
num_threads = os.getenv("CPU_NUM", "1")
self.runtime_configs = {}
self.runtime_configs['communicator_max_merge_var_num'] = os.getenv(
"FLAGS_communicator_max_merge_var_num", num_threads
)
self.runtime_configs['communicator_send_queue_size'] = os.getenv(
"FLAGS_communicator_send_queue_size", num_threads
)
self.runtime_configs[
'communicator_independent_recv_thread'
] = os.getenv("FLAGS_communicator_independent_recv_thread", "1")
self.runtime_configs[
'communicator_min_send_grad_num_before_recv'
] = os.getenv(
"FLAGS_communicator_min_send_grad_num_before_recv", num_threads
)
self.runtime_configs['communicator_thread_pool_size'] = os.getenv(
"FLAGS_communicator_thread_pool_size", "5"
)
self.runtime_configs['communicator_send_wait_times'] = os.getenv(
"FLAGS_communicator_send_wait_times", "5"
)
self.runtime_configs['communicator_is_sgd_optimizer'] = os.getenv(
"FLAGS_communicator_is_sgd_optimizer", "1"
)
# not used
self.runtime_configs['rpc_deadline'] = os.getenv(
"FLAGS_rpc_deadline", "180000"
)
self.runtime_configs['rpc_retry_times'] = os.getenv(
"FLAGS_rpc_retry_times", "3"
)
def get_communicator_flags(self):
need_keys = []
num_threads = os.getenv("CPU_NUM", "1")
mode_str = ""
if self.mode is None or self.mode == DistributedMode.ASYNC:
need_keys = self.runtime_configs.keys()
mode_str = "async"
elif (
self.mode == DistributedMode.SYNC
or self.mode == DistributedMode.HALF_ASYNC
):
mode_str = "sync or half_async"
need_keys = [
'communicator_max_merge_var_num',
'communicator_send_wait_times',
'communicator_thread_pool_size',
'communicator_send_queue_size',
]
elif self.mode == DistributedMode.GEO:
mode_str = "GEO"
need_keys = [
'communicator_thread_pool_size',
'communicator_send_wait_times',
'communicator_max_merge_var_num',
'communicator_send_queue_size',
]
else:
raise ValueError("Unsupported Mode")
if (
self.mode == DistributedMode.SYNC
or self.mode == DistributedMode.HALF_ASYNC
):
max_merge_var_num = self.runtime_configs[
'communicator_max_merge_var_num'
]
send_queue_size = self.runtime_configs[
'communicator_send_queue_size'
]
if max_merge_var_num != num_threads:
print(
'WARNING: In {} mode, communicator_max_merge_var_num '
'must be equal to CPU_NUM. But received, '
'communicator_max_merge_var_num = {}, CPU_NUM = '
'{}. communicator_max_merge_var_num will be fored to {}.'.format(
mode_str, max_merge_var_num, num_threads, num_threads
)
)
self.runtime_configs[
'communicator_max_merge_var_num'
] = num_threads
if send_queue_size != num_threads:
print(
'WARNING: In {} mode, communicator_send_queue_size '
'must be equal to CPU_NUM. But received, '
'communicator_send_queue_size = {}, CPU_NUM = '
'{}. communicator_send_queue_size will be fored to {}.'.format(
mode_str, send_queue_size, num_threads, num_threads
)
)
self.runtime_configs[
'communicator_send_queue_size'
] = num_threads
return dict((key, str(self.runtime_configs[key])) for key in need_keys)
def display(self, configs):
raw0, raw1, length = 45, 5, 50
h_format = "{:^45s}{:<5s}\n"
l_format = "{:<45s}{:<5s}\n"
border = "".join(["="] * length)
line = "".join(["-"] * length)
draws = ""
draws += border + "\n"
draws += h_format.format("TrainerRuntimeConfig Overview", "Value")
draws += line + "\n"
for k, v in configs.items():
draws += l_format.format(k, v)
draws += border
_str = "\n{}\n".format(draws)
return _str
def __repr__(self):
return self.display(self.get_communicator_flags())
class PSLibRuntimeConfig:
def __init__(self):
self.runtime_configs = {}
def get_runtime_configs(self):
return self.runtime_configs
class DistributedStrategy:
def __init__(self):
self._program_config = DistributeTranspilerConfig()
self._trainer_runtime_config = TrainerRuntimeConfig()
self._pslib_runtime_config = PSLibRuntimeConfig()
self._server_runtime_config = ServerRuntimeConfig()
num_threads = int(os.getenv("CPU_NUM", "1"))
self._execute_strategy = fluid.ExecutionStrategy()
self._build_strategy = fluid.BuildStrategy()
self._execute_strategy.num_threads = num_threads
if num_threads > 1:
self._build_strategy.reduce_strategy = (
fluid.BuildStrategy.ReduceStrategy.Reduce
)
self.debug_opt = None
self.use_ps_gpu = False
def set_debug_opt(self, opt_info):
self.debug_opt = opt_info
def get_debug_opt(self):
opt_info = dict()
if self.debug_opt is not None and isinstance(self.debug_opt, dict):
opt_info["dump_slot"] = bool(self.debug_opt.get("dump_slot", 0))
opt_info["dump_converter"] = str(
self.debug_opt.get("dump_converter", "")
)
opt_info["dump_fields"] = self.debug_opt.get("dump_fields", [])
opt_info["dump_file_num"] = self.debug_opt.get("dump_file_num", 16)
opt_info["dump_fields_path"] = self.debug_opt.get(
"dump_fields_path", ""
)
opt_info["dump_param"] = self.debug_opt.get("dump_param", [])
return opt_info
def get_program_config(self):
return self._program_config
def set_program_config(self, config):
if isinstance(config, DistributeTranspilerConfig):
self._program_config = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._program_config, key):
setattr(self._program_config, key, config[key])
else:
raise ValueError(
"DistributeTranspilerConfig doesn't have key: {}".format(
key
)
)
else:
raise TypeError(
"program_config only accept input type: dict or DistributeTranspilerConfig"
)
self.check_program_config()
def check_program_config(self):
raise NotImplementedError(
"check_program_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_trainer_runtime_config(self):
return self._trainer_runtime_config
def set_trainer_runtime_config(self, config):
if isinstance(config, TrainerRuntimeConfig):
self._trainer_runtime_config = config
elif isinstance(config, dict):
for key, Value in config.items():
if key in self._trainer_runtime_config.runtime_configs:
self._trainer_runtime_config.runtime_configs[key] = Value
else:
raise ValueError(
"TrainerRuntimeConfig doesn't have key: {}".format(key)
)
else:
raise TypeError(
"trainer_runtime_config only accept input type: dict or TrainerRuntimeConfig"
)
self.check_trainer_runtime_config()
def check_trainer_runtime_config(self):
raise NotImplementedError(
"check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_pslib_runtime_config(self):
return self._pslib_runtime_config
def set_pslib_runtime_config(self, config):
self._pslib_runtime_config.runtime_configs = config
def get_server_runtime_config(self):
return self._server_runtime_config
def set_server_runtime_config(self, config):
if isinstance(config, ServerRuntimeConfig):
self._server_runtime_config = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._server_runtime_config, key):
setattr(self._server_runtime_config, key, config[key])
else:
raise ValueError(
"ServerRuntimeConfig doesn't have key: {}".format(key)
)
else:
raise TypeError(
"server_runtime_config only accept input type: dict or ServerRuntimeConfig"
)
self.check_server_runtime_config()
def check_server_runtime_config(self):
raise NotImplementedError(
"check_server_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_execute_strategy(self):
return self._execute_strategy
def set_execute_strategy(self, config):
if isinstance(config, fluid.ExecutionStrategy):
self._execute_strategy = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._execute_strategy, key):
setattr(self._execute_strategy, key, config[key])
else:
raise ValueError(
"ExecutionStrategy doesn't have key: {}".format(key)
)
else:
raise TypeError(
"execute_strategy only accept input type: dict or ExecutionStrategy"
)
self.check_execute_strategy()
def check_execute_strategy(self):
raise NotImplementedError(
"check_execute_strategy must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_build_strategy(self):
return self._build_strategy
def set_build_strategy(self, config):
if isinstance(config, fluid.BuildStrategy):
self._build_strategy = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._build_strategy, key):
setattr(self._build_strategy, key, config[key])
else:
raise ValueError(
"BuildStrategy doesn't have key: {}".format(key)
)
else:
raise TypeError(
"build_strategy only accept input type: dict or BuildStrategy"
)
self.check_build_strategy()
def check_build_strategy(self):
raise NotImplementedError(
"check_build_strategy must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
class SyncStrategy(DistributedStrategy):
def __init__(self):
super().__init__()
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.SYNC
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
self._program_config.half_async = True
self._program_config.completely_not_async = True
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
self._execute_strategy.use_thread_barrier = True
def check_build_strategy(self):
self._build_strategy.async_mode = True
class AsyncStrategy(DistributedStrategy):
def __init__(self):
super().__init__()
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.ASYNC
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
pass
def check_build_strategy(self):
self._build_strategy.async_mode = True
class HalfAsyncStrategy(DistributedStrategy):
def __init__(self):
super().__init__()
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.HALF_ASYNC
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
self._program_config.half_async = True
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
self._execute_strategy.use_thread_barrier = True
def check_build_strategy(self):
self._build_strategy.async_mode = True
class GeoStrategy(DistributedStrategy):
def __init__(self, update_frequency=100):
super().__init__()
self._program_config.geo_sgd_need_push_nums = update_frequency
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
self._program_config.geo_sgd_mode = True
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.GEO
self._trainer_runtime_config.runtime_configs[
'communicator_send_queue_size'
] = self._program_config.geo_sgd_need_push_nums
self._trainer_runtime_config.runtime_configs[
'communicator_max_merge_var_num'
] = self._program_config.geo_sgd_need_push_nums
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
pass
def check_build_strategy(self):
self._build_strategy.async_mode = True
class StrategyFactory:
def __init_(self):
pass
@staticmethod
def create_sync_strategy():
return SyncStrategy()
@staticmethod
def create_half_async_strategy():
return HalfAsyncStrategy()
@staticmethod
def create_async_strategy():
return AsyncStrategy()
@staticmethod
def create_geo_strategy(update_frequency=100):
return GeoStrategy(update_frequency)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
import paddle
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
create_heter_program,
create_trainer_program,
find_block_joints,
find_heter_ops,
union_forward_gradient_op,
)
def split_heter_worker_ops_pass(program, config, stage_id, device):
"""
split heter worker program from origin-program
1. find heter op (located on different device)
2. find input&output of every heter-block
3. create heter worker program, add listen&serv op
"""
default_deveice = "cpu"
program, heter_ops, _, program_block_ops = find_heter_ops(
program, default_deveice
)
if len(heter_ops) == 0:
warnings.warn(
"Currently running in Heter Parameter Server mode, but no OP running on heterogeneous devices, Please check your code."
)
return program
program_block_ops = union_forward_gradient_op(program_block_ops)
block_vars_detail = find_block_joints(program, program_block_ops, heter_ops)
heter_program = paddle.static.Program()
create_heter_program(
program,
config,
heter_program,
program_block_ops,
heter_ops,
block_vars_detail,
device,
stage_id,
)
return heter_program
def split_trainer_ops_pass(program, config, default_device="cpu"):
"""
split cpu-trainer program from origin-program
1. find heter op (located on different device)
2. find input&output of every heter-block
3. create cpu-trainer program, add send&recv op
"""
# Todo: support user define default_device (MrChengmo)
default_device_ = default_device
program, heter_ops, default_ops, program_block_ops = find_heter_ops(
program, default_device_
)
program_block_ops = union_forward_gradient_op(program_block_ops)
block_vars_detail = find_block_joints(program, program_block_ops, heter_ops)
trainer_program = program.clone()
create_trainer_program(
trainer_program, program, config, program_block_ops, block_vars_detail
)
return trainer_program
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class PSDispatcher:
"""
PSDispatcher is the base class for dispatching vars
into different pserver instance.
You need to implement the `dispatch` interface.
"""
def __init__(self, pserver_endpoints):
self._eps = pserver_endpoints
self._step = 0
@property
def eps(self):
return self._eps
def reset(self):
"""
reset the step counter, set it zero.
"""
self._step = 0
def dispatch(self, varlist):
"""
Args:
varlist(list): a list of Variables
Returns:
a map of pserver endpoint -> varname
"""
raise NotImplementedError("Interface has not been implemented.")
class HashName(PSDispatcher):
"""
Hash variable names to several endpoints using python
"hash()" function.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super().__init__(pserver_endpoints)
def _hash_block(self, block_str, total):
return hash(block_str) % total
def dispatch(self, varlist):
"""
use `HashName` method to dispatch variables with each parameter server.
Args:
varlist (list): a list of Variables
"""
eplist = []
for var in varlist:
server_id = self._hash_block(var.name(), len(self._eps))
server_for_param = self._eps[server_id]
eplist.append(server_for_param)
return eplist
class RoundRobin(PSDispatcher):
"""
Distribute variables to several endpoints using
RondRobin<https://en.wikipedia.org/wiki/Round-robin_scheduling> method.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super().__init__(pserver_endpoints)
def dispatch(self, varlist):
"""
use `RoundRobin` method to dispatch variables with each parameter server.
Args:
varlist (list): a list of Variables
"""
eplist = []
for var in varlist:
server_for_param = self._eps[self._step]
eplist.append(server_for_param)
self._step += 1
if self._step >= len(self._eps):
self._step = 0
return eplist
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class UnionFind:
"""Union-find data structure.
Union-find is a data structure that keeps track of a set of elements partitioned
into a number of disjoint (non-overlapping) subsets.
Reference:
https://en.wikipedia.org/wiki/Disjoint-set_data_structure
Args:
elements(list): The initialize element list.
"""
def __init__(self, elementes=None):
self._parents = [] # index -> parent index
self._index = {} # element -> index
self._curr_idx = 0
if not elementes:
elementes = []
for ele in elementes:
self._parents.append(self._curr_idx)
self._index.update({ele: self._curr_idx})
self._curr_idx += 1
def find(self, x):
# Find the root index of given element x,
# execute the path compress while findind the root index
if x not in self._index:
return -1
idx = self._index[x]
while idx != self._parents[idx]:
t = self._parents[idx]
self._parents[idx] = self._parents[t]
idx = t
return idx
def union(self, x, y):
# Union two given element
x_root = self.find(x)
y_root = self.find(y)
if x_root == y_root:
return
self._parents[x_root] = y_root
def is_connected(self, x, y):
# If two given elements have the same root index,
# then they are connected.
return self.find(x) == self.find(y)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import reduce
from paddle.framework import core
from paddle.framework.io import Variable
dtype_to_size = {
core.VarDesc.VarType.FP16: 2,
core.VarDesc.VarType.FP32: 4,
core.VarDesc.VarType.FP64: 8,
core.VarDesc.VarType.INT16: 2,
core.VarDesc.VarType.INT32: 4,
core.VarDesc.VarType.INT64: 8,
core.VarDesc.VarType.BOOL: 1,
core.VarDesc.VarType.UINT8: 1,
}
class VarBlock:
def __init__(self, varname, offset, size):
self.varname = varname
# NOTE: real offset is offset * size
self.offset = offset
self.size = size
def __str__(self):
return "%s:%d:%d" % (self.varname, self.offset, self.size)
def create_var_struct(var):
if var.type == core.VarDesc.VarType.SELECTED_ROWS:
lod_level = None
elif var.type == core.VarDesc.VarType.LOD_TENSOR:
lod_level = var.lod_level
else:
raise ValueError("can only support SELECTED_ROWS/LOD_TENSOR now")
return VarStruct(
var.name, var.shape, var.dtype, var.type, lod_level, var.persistable
)
class VarStruct:
"""
record part properties of a Variable in python.
"""
def __init__(self, name, shape, dtype, type, lod_level, persistable):
self.name = name
self.shape = shape
self.dtype = dtype
self.type = type
self.lod_level = lod_level
self.persistable = persistable
self.m_size = 1
self.m_size = reduce(lambda x, y: x * y, shape)
self.m_size *= dtype_to_size[dtype]
def __str__(self):
return "N: {}, S: {}, D: {}, T: {}, LL: {}, P: {}, M: {}".format(
self.name,
self.shape,
self.dtype,
self.type,
self.lod_level,
self.persistable,
self.m_size,
)
class VarDistributed:
"""
a class to record the var distributed on parameter servers.
the class will record the relationship between origin var and slice var.
the slice var's properties, such as type/shape/offset/endpoint.
"""
def __init__(
self,
origin_var,
slice_var,
is_slice=None,
block_id=None,
offset=None,
vtype=None,
endpoint=None,
):
"""
Args:
origin_var(Variable|VarStruct): origin var properties
slice_var(Variable|VarStruct): slice var properties
is_slice(bool|None): slice or not, slice_var=True/False and its block size > 8192 are the judgement standard.
block_id(int|None): the number about the slice var.
offset(int|None): if the slice var is sliced, offset is the numel before the var.
vtype(str|None): a tag, such as Optimizer/Param/RemoteProfetch.
endpoint(str|None): which parameter the slice var on, such as "127.0.0.1:1001"
"""
if isinstance(origin_var, Variable):
self.origin = create_var_struct(origin_var)
else:
self.origin = origin_var
if isinstance(slice_var, Variable):
self.slice = create_var_struct(slice_var)
else:
self.slice = slice_var
if self.equal(self.origin, self.slice):
self.is_slice = False
self.block_id = 0
self.offset = 0
else:
self.is_slice = True
self.block_id = 0
self.offset = 0
if is_slice is not None:
self.is_slice = is_slice
if block_id is not None:
self.block_id = block_id
if offset is not None:
self.offset = offset
self.vtype = vtype
self.endpoint = endpoint
@staticmethod
def equal(var1, var2):
"""
the two var is equal or not.
Returns:
bool: equal will return True else False
"""
assert isinstance(var1, VarStruct) and isinstance(var2, VarStruct)
return (
var1.name == var2.name
and var1.type == var2.type
and var1.shape == var2.shape
and var1.dtype == var2.dtype
and var1.lod_level == var2.lod_level
and var1.persistable == var2.persistable
)
def __str__(self):
origin_var_str = (
"{name} : fluid.{type}.shape{shape}.astype({dtype})".format(
name=self.origin.name,
type=self.origin.type,
shape=self.origin.shape,
dtype=self.origin.dtype,
)
)
slice_var_str = (
"{name} : fluid.{type}.shape{shape}.astype({dtype})"
".slice({is_slice}).block({block_id}).offset({offset})".format(
name=self.slice.name,
type=self.slice.type,
shape=self.slice.shape,
dtype=self.slice.dtype,
is_slice=self.is_slice,
block_id=self.block_id,
offset=self.offset,
)
)
return "var owned: {}, origin var: ( {} ), slice var: ( {} ), endpoint: {} ".format(
self.vtype, origin_var_str, slice_var_str, self.endpoint
)
class VarsDistributed:
"""
a gather about VarDistributed with many methods to find distributed vars.
through the class, we can get overview about the distributed parameters on parameter servers.
this class may centralized and convenient for developer to manage and get variable's distribute.
other module can also use this to find variables such io.py.
"""
def __init__(self):
self.distributed_vars = []
def add_distributed_var(
self,
origin_var,
slice_var,
is_slice=None,
block_id=None,
offset=None,
vtype=None,
endpoint=None,
):
"""
add distributed var in this.
Args:
origin_var(Variable|VarStruct): origin var properties
slice_var(Variable|VarStruct): slice var properties
is_slice(bool|None): slice or not, slice_var=True/False and its block size > 8192 are the judgement standard.
block_id(int|None): the number about the slice var.
offset(int|None): if the slice var is sliced, offset is the numel before the var.
vtype(str|None): a tag, such as Optimizer/Param/RemoteProfetch.
endpoint(str|None): which parameter the slice var on, such as "127.0.0.1:1001"
Returns:
None
"""
self.distributed_vars.append(
VarDistributed(
origin_var,
slice_var,
is_slice,
block_id,
offset,
vtype,
endpoint,
)
)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class PSMode:
"""
There are various mode for fleet, each of them is designed for different model.
"""
TRANSPILER = 1
PSLIB = 2
class DistributedMode:
SYNC = 0
ASYNC = 1
HALF_ASYNC = 2
GEO = 3
......@@ -438,6 +438,7 @@ packages=['paddle',
'paddle.incubate.distributed.models',
'paddle.incubate.distributed.models.moe',
'paddle.incubate.distributed.models.moe.gate',
'paddle.incubate.fleet.parameter_server.distribute_transpiler',
'paddle.quantization',
'paddle.quantization.quanters',
'paddle.sparse',
......
......@@ -1324,6 +1324,7 @@ def get_setup_parameters():
'paddle.incubate.distributed.models',
'paddle.incubate.distributed.models.moe',
'paddle.incubate.distributed.models.moe.gate',
'paddle.incubate.fleet.parameter_server.distribute_transpiler',
'paddle.quantization',
'paddle.quantization.quanters',
'paddle.sparse',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册