未验证 提交 c14ec878 编写于 作者: T tangwei12 提交者: GitHub

【paddle.fleet】Feature/fleet ps api 2.0 (#25857)

* add paddle.fleet.AsyncOptimizer
Co-authored-by: Ndongdaxiang <dongdaxiang@baidu.com>
上级 3c8daa9b
...@@ -80,7 +80,7 @@ message ExecutionStrategy { ...@@ -80,7 +80,7 @@ message ExecutionStrategy {
} }
message AsyncConfig { message AsyncConfig {
optional int32 k_steps = 1 [ default = 1 ]; optional int32 k_steps = 1 [ default = -1 ];
optional int32 max_merge_var_num = 2 [ default = 1 ]; optional int32 max_merge_var_num = 2 [ default = 1 ];
optional int32 send_queue_size = 3 [ default = 16 ]; optional int32 send_queue_size = 3 [ default = 16 ];
optional bool independent_recv_thread = 4 [ default = false ]; optional bool independent_recv_thread = 4 [ default = false ];
......
...@@ -227,8 +227,11 @@ class DistributedStrategy(object): ...@@ -227,8 +227,11 @@ class DistributedStrategy(object):
def a_sync(self, flag): def a_sync(self, flag):
if isinstance(flag, bool): if isinstance(flag, bool):
self.strategy.a_sync = flag self.strategy.a_sync = flag
self.a_sync_configs = {"k_steps": 0}
else: else:
print("WARNING: a_sync should have value of bool type") raise ValueError(
"The type of `flag` is invalid, expected type is bool, but received %s".
format(type(flag)))
@property @property
def a_sync_configs(self): def a_sync_configs(self):
......
...@@ -189,12 +189,12 @@ class Fleet(object): ...@@ -189,12 +189,12 @@ class Fleet(object):
assert self._runtime_handle is not None assert self._runtime_handle is not None
self._runtime_handle._init_worker() self._runtime_handle._init_worker()
def init_server(self, model_dir=None): def init_server(self, *args, **kwargs):
""" """
init server init server
""" """
assert self._runtime_handle is not None assert self._runtime_handle is not None
self._runtime_handle._init_server() self._runtime_handle._init_server(*args, **kwargs)
def run_server(self): def run_server(self):
""" """
...@@ -291,6 +291,7 @@ class Fleet(object): ...@@ -291,6 +291,7 @@ class Fleet(object):
else: else:
self.origin_startup_program = \ self.origin_startup_program = \
startup_program.clone(for_test=False) startup_program.clone(for_test=False)
context["origin_startup_program"] = startup_program context["origin_startup_program"] = startup_program
context["role_maker"] = self._role_maker context["role_maker"] = self._role_maker
...@@ -329,12 +330,19 @@ class Fleet(object): ...@@ -329,12 +330,19 @@ class Fleet(object):
optimize_ops = [] optimize_ops = []
params_grads = [] params_grads = []
if meta_optimizer: if meta_optimizer:
optimize_ops, params_grads = meta_optimizer.minimize( optimize_ops, params_grads = meta_optimizer.minimize(
loss, loss,
startup_program=startup_program, startup_program=startup_program,
parameter_list=parameter_list, parameter_list=parameter_list,
no_grad_set=no_grad_set) no_grad_set=no_grad_set)
default_program = paddle.default_main_program()
if id(default_program) != id(loss.block.program):
paddle.fluid.framework.switch_main_program(loss.block.program)
else: else:
optimize_ops, params_grads = self.user_defined_optimizer.minimize( optimize_ops, params_grads = self.user_defined_optimizer.minimize(
loss, loss,
...@@ -344,6 +352,7 @@ class Fleet(object): ...@@ -344,6 +352,7 @@ class Fleet(object):
context["program_optimize_ops"] = optimize_ops context["program_optimize_ops"] = optimize_ops
context["program_params_grads"] = params_grads context["program_params_grads"] = params_grads
if graph_optimizer: if graph_optimizer:
optimize_ops, params_grads = graph_optimizer.minimize( optimize_ops, params_grads = graph_optimizer.minimize(
loss, loss,
......
...@@ -12,27 +12,12 @@ ...@@ -12,27 +12,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ..meta_optimizers import AMPOptimizer
from ..meta_optimizers import RecomputeOptimizer
from ..meta_optimizers import GradientMergeOptimizer
from ..meta_optimizers import GraphExecutionOptimizer
from ..meta_optimizers import PipelineOptimizer
from ..meta_optimizers import LocalSGDOptimizer
from ..meta_optimizers import LarsOptimizer
from ..meta_optimizers import DGCOptimizer
__all__ = ["MetaOptimizerFactory"] __all__ = ["MetaOptimizerFactory"]
meta_optimizer_names = [ from ..meta_optimizers import *
"AMPOptimizer",
"RecomputeOptimizer", meta_optimizer_names = list(
"GradientMergeOptimizer", filter(lambda name: name.endswith("Optimizer"), dir()))
"GraphExecutionOptimizer",
"PipelineOptimizer",
"LocalSGDOptimizer",
"LarsOptimizer",
"DGCOptimizer",
]
class MetaOptimizerFactory(object): class MetaOptimizerFactory(object):
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from ..runtime.collective_runtime import CollectiveRuntime from ..runtime.collective_runtime import CollectiveRuntime
from ..runtime.parameter_server_runtime import ParameterServerRuntime
class RuntimeFactory(object): class RuntimeFactory(object):
...@@ -23,3 +24,9 @@ class RuntimeFactory(object): ...@@ -23,3 +24,9 @@ class RuntimeFactory(object):
collective_runtime = CollectiveRuntime() collective_runtime = CollectiveRuntime()
collective_runtime._set_basic_info(context) collective_runtime._set_basic_info(context)
return collective_runtime return collective_runtime
k_steps = context["valid_strategy"].a_sync_configs["k_steps"]
if not context["role_maker"]._is_collective and k_steps >= 0:
ps_runtime = ParameterServerRuntime()
ps_runtime._set_basic_info(context)
return ps_runtime
...@@ -15,17 +15,22 @@ from .amp_optimizer import AMPOptimizer ...@@ -15,17 +15,22 @@ from .amp_optimizer import AMPOptimizer
from .recompute_optimizer import RecomputeOptimizer from .recompute_optimizer import RecomputeOptimizer
from .gradient_merge_optimizer import GradientMergeOptimizer from .gradient_merge_optimizer import GradientMergeOptimizer
from .graph_execution_optimizer import GraphExecutionOptimizer from .graph_execution_optimizer import GraphExecutionOptimizer
from .async_optimizer import AsyncMetaOptimizer
from .pipeline_optimizer import PipelineOptimizer from .pipeline_optimizer import PipelineOptimizer
from .localsgd_optimizer import LocalSGDOptimizer from .localsgd_optimizer import LocalSGDOptimizer
from .lars_optimizer import LarsOptimizer from .lars_optimizer import LarsOptimizer
from .async_graph_execution_optimizer import AsyncGraphExecutionOptimizer
from .dgc_optimizer import DGCOptimizer from .dgc_optimizer import DGCOptimizer
__all__ = [ __all__ = [
'AMPOptimizer', 'AMPOptimizer',
'RecomputeOptimizer', 'RecomputeOptimizer',
'GradientMergeOptimizer', 'GradientMergeOptimizer',
'AsyncMetaOptimizer',
'GraphExecutionOptimizer',
'PipelineOptimizer', 'PipelineOptimizer',
'LocalSGDOptimizer', 'LocalSGDOptimizer',
'LarsOptimizer', 'LarsOptimizer',
'AsyncGraphExecutionOptimizer',
'DGCOptimizer', 'DGCOptimizer',
] ]
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from paddle import fluid
from paddle.fluid import compiler
from .async_optimizer import AsyncMetaOptimizer
class AsyncGraphExecutionOptimizer(AsyncMetaOptimizer):
def __init__(self, optimizer):
super(AsyncGraphExecutionOptimizer, self).__init__(optimizer)
self.inner_opt = optimizer
# we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = []
def _can_apply(self):
k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
if k_steps < 0:
return False
if self.role_maker.is_server():
return False
return True
def _is_graph_out(self):
return True
def _try_to_compile(self, main_program, loss):
dist_strategy = self._get_distributed_strategy()
build_strategy = dist_strategy.get_build_strategy()
exec_strategy = dist_strategy.get_execute_strategy()
self._compiled_program = compiler.CompiledProgram(main_program)
self._compiled_program.with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
share_vars_from=None)
return self._compiled_program
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
program = loss.block.program
compiled_program = self._try_to_compile(program, loss)
program._graph = compiled_program
# just return self.optimizer_ops and self.param_grads
return None, None
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from paddle import fluid
from .meta_optimizer_base import MetaOptimizerBase
class AsyncMetaOptimizer(MetaOptimizerBase):
def __init__(self, optimizer):
super(AsyncMetaOptimizer, self).__init__(optimizer)
self.inner_opt = optimizer
# we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = []
def _is_graph_out(self):
return False
def _can_apply(self):
if self.role_maker._is_collective:
return False
k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
return True if k_steps >= 0 else False
def _get_distributed_strategy(self):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
k_steps = self.user_defined_strategy.a_sync_configs["k_steps"]
strategy = None
if not self.user_defined_strategy.a_sync and k_steps == 0:
strategy = StrategyFactory.create_sync_strategy()
if self.user_defined_strategy.a_sync and k_steps == 0:
strategy = StrategyFactory.create_async_strategy()
if self.user_defined_strategy.a_sync and k_steps > 0:
strategy = StrategyFactory.create_geo_strategy(k_steps)
if not strategy:
raise ValueError("k_steps must be invalid value, please check")
return strategy
def _build_trainer_programs(self, compiled_config):
from paddle.fluid.incubate.fleet.parameter_server.ir import trainer_pass as worker
_main = compiled_config.origin_main_program.clone()
_startup = compiled_config.origin_startup_program.clone()
if not compiled_config.is_geo_mode():
# for main program
_main = worker.delete_optimizer_pass(_main, compiled_config)
_main = worker.distributed_ops_pass(_main, compiled_config)
_main = worker.append_send_ops_pass(_main, compiled_config)
# for startup program
_startup = worker.fake_init_ops_pass(_startup, compiled_config)
_startup = worker.init_from_server_pass(_startup, compiled_config)
_startup = worker.delet_extra_optimizes_pass(_startup,
compiled_config)
else:
_main = worker.append_send_ops_pass(_main, compiled_config)
_startup = _startup
return _main, _startup
def _build_pserver_programs(self, compiled_config):
from paddle.fluid.incubate.fleet.parameter_server.ir import pserver_pass as server
_main = fluid.Program()
_startup = fluid.Program()
if not compiled_config.is_geo_mode():
_main = server.add_listen_and_serv_pass(_main, compiled_config)
_main = server.add_rpc_global_flags_pass(_main, compiled_config)
_main = server.add_optimizer_pass(_main, compiled_config)
_main = server.large_scale_sparse_pass(_main, _main,
compiled_config, False)
_startup = server.build_pserver_startup_program_pass(
_startup, _main, compiled_config)
_startup = server.large_scale_sparse_pass(_startup, _main,
compiled_config, True)
if not compiled_config.is_sync_mode():
_main = server.delete_unused_in_main_pass(_main,
compiled_config)
_startup = server.delete_unused_in_startup_pass(_startup, _main,
compiled_config)
else:
_main = server.add_listen_and_serv_pass(_main, compiled_config)
_main = server.add_rpc_global_flags_pass(_main, compiled_config)
_main = server.add_geo_optimizer_pass(_main, compiled_config)
_main = server.large_scale_sparse_pass(_main, _main,
compiled_config, False)
_startup = server.build_pserver_startup_program_pass(
_startup, _main, compiled_config)
_startup = server.large_scale_sparse_pass(_startup, _main,
compiled_config, True)
_startup = server.delete_unused_in_startup_pass(_startup, _main,
compiled_config)
return _main, _startup
def minimize_impl(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
self.inner_opt.minimize(loss, startup_program, parameter_list,
no_grad_set)
strategy = self._get_distributed_strategy()
_origin_main_program = loss.block.program
_origin_startup_program = startup_program
from paddle.fluid.incubate.fleet.parameter_server.ir import public as public
compiled_config = public.CompileTimeStrategy(_origin_main_program,
_origin_startup_program,
strategy, self.role_maker)
main_program, startup_program = \
self._build_trainer_programs(compiled_config) if self.role_maker.is_worker() \
else self._build_pserver_programs(compiled_config)
loss.block.program = main_program
fluid.framework.switch_startup_program(startup_program)
return None, None
def _disable_strategy(self, dist_strategy):
self.user_defined_strategy.a_sync_configs["k_steps"] = -1
...@@ -13,5 +13,6 @@ ...@@ -13,5 +13,6 @@
# limitations under the License. # limitations under the License.
from .collective_runtime import CollectiveRuntime from .collective_runtime import CollectiveRuntime
from .parameter_server_runtime import ParameterServerRuntime
__all__ = ["CollectiveRuntime"] __all__ = ["CollectiveRuntime," "ParameterServerRuntime", ]
...@@ -30,7 +30,7 @@ class CollectiveRuntime(RuntimeBase): ...@@ -30,7 +30,7 @@ class CollectiveRuntime(RuntimeBase):
"You should not call 'run_worker' method for collective mode.") "You should not call 'run_worker' method for collective mode.")
pass pass
def _init_server(self): def _init_server(self, *args, **kwargs):
logging.warn( logging.warn(
"You should not call 'init_server' method for collective mode.") "You should not call 'init_server' method for collective mode.")
pass pass
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import warnings
import paddle.fluid as fluid
from paddle.fluid import core
from .runtime_base import RuntimeBase
class ParameterServerRuntime(RuntimeBase):
def __init__(self):
super(ParameterServerRuntime, self).__init__()
self._communicator = None
def _set_basic_info(self, context):
self.context = context
self.role_maker = context["role_maker"]
self.origin_main_program = context["origin_main_program"]
self.origin_startup_program = context["origin_startup_program"]
self.async_strategy = self._get_distributed_strategy()
self.compiled_strategy = self.build_compiled_startegy()
def _get_distributed_strategy(self):
strategy = None
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
dist_strategy = self.context["valid_strategy"]
k_steps = dist_strategy.a_sync_configs["k_steps"]
if not dist_strategy.a_sync and k_steps == 0:
strategy = StrategyFactory.create_sync_strategy()
if dist_strategy.a_sync and k_steps == 0:
strategy = StrategyFactory.create_async_strategy()
if dist_strategy.a_sync and k_steps > 0:
strategy = StrategyFactory.create_geo_strategy(k_steps)
if not strategy:
raise ValueError("k_steps must be invalid value, please check")
return strategy
def build_compiled_startegy(self):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import CompileTimeStrategy
compiled_config = CompileTimeStrategy(
self.origin_main_program, self.origin_main_program,
self.async_strategy, self.role_maker)
return compiled_config
def _load_sparse_params(self, dirname, varnames):
from paddle.fluid.communicator import LargeScaleKV
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_varname_parts
scale_kv = LargeScaleKV()
for varname in varnames:
origin_varname, _, _ = _get_varname_parts(varname)
sparse_dir = os.path.join(dirname, origin_varname, varname)
scale_kv.load(varname, sparse_dir)
@staticmethod
def __exclude_vars(exclude_var_names=[]):
def is_valid(var):
if var.name in exclude_var_names:
return False
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_varname_parts
origin_varname, _, _ = _get_varname_parts(var.name)
if origin_varname.endswith("@GRAD"):
return False
if origin_varname == "learning_rate_0":
return False
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.READER:
return False
return var.persistable
return is_valid
def _init_worker(self):
def sync_strategy_envs():
kwargs = {}
kwargs["pserver_endpoints"] = self.role_maker.get_pserver_endpoints(
)
kwargs["trainer_id"] = self.role_maker.worker_index()
return kwargs
def geo_strategy_envs():
from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_tablenames
def get_sparse_attrs():
opt_init_map = {}
opt_init_map["gaussian_random"] = ["seed", "mean", "std"]
opt_init_map["fill_constant"] = ["value"]
opt_init_map["uniform_random"] = ["seed", "min", "max"]
opt_init_map[
"truncated_gaussian_random"] = ["seed", "mean", "std"]
dist_varnames = get_sparse_tablenames(self.origin_main_program,
True)
sparse_varnames = get_sparse_tablenames(
self.origin_main_program, False)
if len(dist_varnames) != 0:
raise ValueError(
"GeoStrategy can not support large scale embeding now, please use fluid.layers.embedding"
)
init_attrs = []
for value_name in sparse_varnames:
value_var = self.origin_main_program.global_block().vars[
value_name]
value_attr = [
value_name,
",".join([str(dim) for dim in value_var.shape])
]
for op in self.origin_startup_program.global_block().ops:
if op.type in opt_init_map.keys(
) and value_name == op.output("Out")[0]:
init_attr = [op.type]
for attr in opt_init_map[op.type]:
init_attr.append(str(op.attr(attr)))
value_attr.append("&".join(init_attr))
init_attrs.append(":".join(value_attr))
break
return "#".join(init_attrs)
kwargs = {}
kwargs["trainers"] = self.role_maker.worker_num()
kwargs["sparse_attrs"] = get_sparse_attrs()
return kwargs
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import \
SyncStrategy, GeoStrategy
trainer_config = self.async_strategy.get_trainer_runtime_config()
lrs = _get_lr_ops(self.origin_main_program)
if len(lrs) > 0:
kwargs = {"need_global_step": "1"}
else:
kwargs = {"need_global_step": "0"}
if isinstance(self.async_strategy, GeoStrategy):
geo_kwargs = geo_strategy_envs()
kwargs.update(geo_kwargs)
if isinstance(self.async_strategy, SyncStrategy):
sync_kwargs = sync_strategy_envs()
kwargs.update(sync_kwargs)
kwargs = kwargs if kwargs else None
send_ctx = self.compiled_strategy.get_communicator_send_context()
if self.compiled_strategy.is_geo_mode():
recv_ctx = self.compiled_strategy.get_communicator_recv_context(
recv_type=4)
else:
recv_ctx = self.compiled_strategy.get_communicator_recv_context(
recv_type=1)
from paddle.fluid.communicator import Communicator
self._communicator = Communicator(
trainer_config.mode, kwargs,
trainer_config.get_communicator_flags())
self._communicator.init_with_ctx(send_ctx, recv_ctx)
if not self._communicator.is_running():
self._communicator.start()
else:
warnings.warn("communicator has been initialized, skip")
def _init_server(self, *args, **kwargs):
if len(args) > 1:
raise ValueError("init server can only accept 1 args: `dirname`")
elif len(args) == 1:
model_dirname = args[0]
else:
model_dirname = None
executor = fluid.Executor(fluid.CPUPlace())
executor.run(fluid.default_startup_program())
if not model_dirname:
return
if not os.path.isdir(model_dirname):
raise ValueError("There is no directory named '%s'", model_dirname)
sparse_varnames = self.compiled_strategy.get_sparse_varname_on_ps(True)
distribtued_varnames = self.compiled_strategy.get_sparse_varname_on_ps(
False)
remaining_vars = list(
filter(
ParameterServerRuntime.__exclude_vars(sparse_varnames +
distribtued_varnames),
fluid.default_main_program().list_vars()))
fluid.io.load_vars(
executor,
main_program=fluid.default_main_program(),
dirname=model_dirname,
vars=remaining_vars)
self._load_sparse_params(
dirname=model_dirname, varnames=sparse_varnames)
# todo(tangwei12) load distributed vars
# self._load_sparse_params(dirname=model_dir, varnames=distribtued_varnames)
def _run_server(self):
executor = fluid.Executor(fluid.CPUPlace())
executor.run(fluid.default_main_program())
def _stop_worker(self):
self._communicator.stop()
executor = fluid.Executor(fluid.CPUPlace())
executor.close()
...@@ -25,7 +25,7 @@ class RuntimeBase(object): ...@@ -25,7 +25,7 @@ class RuntimeBase(object):
def _run_worker(self): def _run_worker(self):
pass pass
def _init_server(self): def _init_server(self, *args, **kwargs):
pass pass
def _run_server(self): def _run_server(self):
......
...@@ -26,32 +26,44 @@ import paddle ...@@ -26,32 +26,44 @@ import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet import paddle.fleet as fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
class TestCommunicatorGeoEnd2End(unittest.TestCase): class TestCommunicatorGeoEnd2End(unittest.TestCase):
def net(self): def net(self):
x = fluid.layers.data(name='x', shape=[13], dtype='float32') x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None) x1 = fluid.layers.data(name='x1', shape=[1], dtype='int64', lod_level=1)
emb = fluid.layers.embedding(
input=x1,
size=[10000, 10],
param_attr=fluid.ParamAttr(
name="embedding",
initializer=fluid.initializer.Constant(value=0.01)),
is_sparse=True)
pool = fluid.layers.sequence_pool(input=emb, pool_type="sum")
z = fluid.layers.concat(input=[x, pool], axis=1)
y_predict = fluid.layers.fc(input=z, size=1, act=None)
y = fluid.layers.data(name='y', shape=[1], dtype='float32') y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y) cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost) avg_cost = fluid.layers.mean(cost)
return avg_cost, x, y return avg_cost, x, x1, y
def fake_reader(self): def fake_reader(self):
def reader(): def reader():
for i in range(10000): for i in range(10000):
x = numpy.random.random((1, 13)).astype('float32') x = numpy.random.random((1, 13)).astype('float32')
z = numpy.random.randint(0, 9999, (1, 1)).astype('int64')
y = numpy.random.randint(0, 2, (1, 1)).astype('int64') y = numpy.random.randint(0, 2, (1, 1)).astype('int64')
yield x, y yield x, z, y
return reader return reader
def run_pserver(self, role, strategy): def run_pserver(self, role, strategy):
fleet.init(role) fleet.init(role)
avg_cost, x, y = self.net() avg_cost, x, z, y = self.net()
optimizer = fluid.optimizer.SGD(0.01) optimizer = fluid.optimizer.SGD(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
...@@ -64,33 +76,41 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): ...@@ -64,33 +76,41 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase):
exe = fluid.Executor(place) exe = fluid.Executor(place)
fleet.init(role) fleet.init(role)
avg_cost, x, y = self.net() avg_cost, x, z, y = self.net()
optimizer = fluid.optimizer.SGD(0.01) optimizer = fluid.optimizer.SGD(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
fleet.init_worker() fleet.init_worker()
exe.run(fleet.startup_program) exe.run(fluid.default_startup_program())
train_reader = paddle.batch(self.fake_reader(), batch_size=24) train_reader = paddle.batch(self.fake_reader(), batch_size=24)
feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) feeder = fluid.DataFeeder(place=place, feed_list=[x, z, y])
for batch_id, data in enumerate(train_reader()): for batch_id, data in enumerate(train_reader()):
exe.run(fleet.main_program, feed=feeder.feed(data), fetch_list=[]) exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[])
fleet.stop_worker() fleet.stop_worker()
def run_ut(self): def run_ut(self):
training_role = os.getenv("TRAINING_ROLE", "TRAINER") training_role = os.getenv("TRAINING_ROLE", "TRAINER")
role = role_maker.UserDefinedRoleMaker( os.environ["PADDLE_PSERVER_NUMS"] = "1"
current_id=0, os.environ["PADDLE_TRAINERS_NUM"] = "1"
role=role_maker.Role.WORKER os.environ["POD_IP"] = "127.0.0.1"
if training_role == "TRAINER" else role_maker.Role.SERVER, os.environ["PADDLE_PORT"] = "36001"
worker_num=1, os.environ["PADDLE_TRAINER_ID"] = "0"
server_endpoints=["127.0.0.1:18099"]) os.environ["PADDLE_TRAINERS_NUM"] = "1"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001"
strategy = StrategyFactory.create_geo_strategy(10) role = role_maker.PaddleCloudRoleMaker()
strategy = paddle.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100}
if training_role == "TRAINER": if training_role == "TRAINER":
self.run_trainer(role, strategy) self.run_trainer(role, strategy)
...@@ -116,8 +136,7 @@ import paddle.fluid as fluid ...@@ -116,8 +136,7 @@ import paddle.fluid as fluid
from paddle.fluid.communicator import Communicator from paddle.fluid.communicator import Communicator
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet import paddle.fleet as fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from test_communicator_geo import TestCommunicatorGeoEnd2End from test_communicator_geo import TestCommunicatorGeoEnd2End
......
...@@ -16,16 +16,13 @@ from __future__ import print_function ...@@ -16,16 +16,13 @@ from __future__ import print_function
import unittest import unittest
import time import time
import threading
import numpy
import os
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.communicator import Communicator
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet import paddle.fleet as fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
class TestCommunicator(unittest.TestCase): class TestCommunicator(unittest.TestCase):
...@@ -39,19 +36,24 @@ class TestCommunicator(unittest.TestCase): ...@@ -39,19 +36,24 @@ class TestCommunicator(unittest.TestCase):
return avg_cost return avg_cost
def test_communicator_sync(self): def test_communicator_sync(self):
role = role_maker.UserDefinedRoleMaker( os.environ["TRAINING_ROLE"] = "TRAINER"
current_id=0, os.environ["PADDLE_PSERVER_NUMS"] = "2"
role=role_maker.Role.WORKER, os.environ["PADDLE_TRAINERS_NUM"] = "2"
worker_num=2, os.environ["POD_IP"] = "127.0.0.1"
server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"]) os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
fleet.init(role) fleet.init(role_maker.PaddleCloudRoleMaker())
avg_cost = self.net() avg_cost = self.net()
optimizer = fluid.optimizer.SGD(0.01) optimizer = fluid.optimizer.SGD(0.01)
strategy = StrategyFactory.create_sync_strategy() strategy = paddle.fleet.DistributedStrategy()
strategy._program_config.wait_port = False strategy.a_sync = False
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import unittest
import paddle
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_a_sync_optimizer_trainer(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
import paddle.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
prog = paddle.fluid.default_main_program()
self.assertNotEqual(prog.global_block().ops[-1].type, "send_barrier")
sends = 0
sgds = 0
for op in prog.global_block().ops:
if op.type == "send":
sends += 1
if op.type == "sgd":
sgds += 1
self.assertEqual(sends, 7)
self.assertEqual(sgds, 0)
fleet.init_worker()
time.sleep(8)
fleet.stop_worker()
def test_a_sync_optimizer_pserver(self):
os.environ["TRAINING_ROLE"] = "PSERVER"
import paddle.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
prog = paddle.fluid.default_main_program()
self.assertEqual(prog.global_block().ops[0].type, "listen_and_serv")
fleet.init_server()
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import time
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_a_sync_optimizer_trainer(self):
os.environ["TRAINING_ROLE"] = "TRAINER"
import paddle.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100}
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
prog = paddle.fluid.default_main_program()
self.assertEqual(prog.global_block().ops[-1].type, "send")
sends = 0
sgds = 0
for op in prog.global_block().ops:
if op.type == "send":
sends += 1
if op.type == "sgd":
sgds += 1
self.assertEqual(sends, 1)
self.assertEqual(sgds, 6)
def test_a_sync_optimizer_pserver(self):
os.environ["TRAINING_ROLE"] = "PSERVER"
import paddle.fleet as fleet
main_program = paddle.fluid.Program()
startup_program = paddle.fluid.Program()
paddle.fluid.framework.switch_main_program(main_program)
paddle.fluid.framework.switch_startup_program(startup_program)
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"k_steps": 100}
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
prog = paddle.fluid.default_main_program()
self.assertEqual(prog.global_block().ops[0].type, "listen_and_serv")
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
import paddle.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import time
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "6007"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ID"] = "0"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_gradient_merge_optimizer(self):
fleet.init(role_maker.PaddleCloudRoleMaker())
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
strategy = paddle.fleet.DistributedStrategy()
strategy.a_sync = False
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
prog = paddle.fluid.default_main_program()
self.assertEqual(prog.global_block().ops[-1].type, "send_barrier")
sends = 0
sgds = 0
for op in prog.global_block().ops:
if op.type == "send":
sends += 1
if op.type == "sgd":
sgds += 1
self.assertEqual(sends, 6)
self.assertEqual(sgds, 0)
fleet.init_worker()
time.sleep(8)
fleet.stop_worker()
if __name__ == "__main__":
unittest.main()
...@@ -19,10 +19,10 @@ import unittest ...@@ -19,10 +19,10 @@ import unittest
import tempfile import tempfile
import shutil import shutil
import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet import paddle.fleet as fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
# For Net # For Net
base_lr = 0.2 base_lr = 0.2
...@@ -149,40 +149,41 @@ class TestPSPassWithBow(unittest.TestCase): ...@@ -149,40 +149,41 @@ class TestPSPassWithBow(unittest.TestCase):
return [avg_cost, acc, cos_q_pt] return [avg_cost, acc, cos_q_pt]
def test(self): def test(self):
endpoints = ["127.0.0.1:36004"] os.environ["PADDLE_PSERVER_NUMS"] = "2"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
role = role_maker.UserDefinedRoleMaker( os.environ["POD_IP"] = "127.0.0.1"
current_id=0, os.environ["PADDLE_PORT"] = "36001"
role=role_maker.Role.SERVER, os.environ["PADDLE_TRAINER_ID"] = "0"
worker_num=2, os.environ["PADDLE_TRAINERS_NUM"] = "2"
server_endpoints=endpoints) os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
os.environ["TRAINING_ROLE"] = "PSERVER"
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role) fleet.init(role)
loss, acc, _ = self.net() loss, acc, _ = self.net()
optimizer = fluid.optimizer.SGD(base_lr)
strategy = StrategyFactory.create_async_strategy() strategy = paddle.fleet.DistributedStrategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy) strategy.a_sync = True
optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(loss) optimizer.minimize(loss)
fleet.startup_program_bak = fleet.startup_program model_dir = tempfile.mkdtemp()
fleet.startup_program = None
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
fleet.init_server() fleet.init_server(os.path.join(model_dir, "temp"), "xxxx")
model_dir = tempfile.mkdtemp()
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
fleet.init_server(os.path.join(model_dir, "temp")) fleet.init_server(os.path.join(model_dir, "temp"))
fleet.startup_program = fleet.startup_program_bak
fleet.init_server() fleet.init_server()
from paddle.fluid.communicator import LargeScaleKV from paddle.fluid.communicator import LargeScaleKV
kv = LargeScaleKV() kv = LargeScaleKV()
kv.save("__emb__", os.path.join(model_dir, "__emb__", "__emb__")) kv.save("__emb__.block0",
os.path.join(model_dir, "__emb__", "__emb__.block0"))
fleet.main_program = fluid.Program() fluid.framework.switch_main_program(fluid.Program())
fleet.init_server(model_dir) fleet.init_server(model_dir)
shutil.rmtree(model_dir) shutil.rmtree(model_dir)
......
...@@ -197,8 +197,9 @@ class TestStrategyConfig(unittest.TestCase): ...@@ -197,8 +197,9 @@ class TestStrategyConfig(unittest.TestCase):
self.assertEqual(strategy.a_sync, True) self.assertEqual(strategy.a_sync, True)
strategy.a_sync = False strategy.a_sync = False
self.assertEqual(strategy.a_sync, False) self.assertEqual(strategy.a_sync, False)
with self.assertRaises(ValueError):
strategy.a_sync = "True" strategy.a_sync = "True"
self.assertEqual(strategy.a_sync, False)
def test_a_sync_configs(self): def test_a_sync_configs(self):
strategy = paddle.fleet.DistributedStrategy() strategy = paddle.fleet.DistributedStrategy()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册