未验证 提交 43622a20 编写于 作者: W wangzhen38 提交者: GitHub

[RM FLUID] trainer_pass&heter_trainer_pass (#50610)

* [RM FLUID] trainer_pass&heter_trainer_pass
* [RM FLUID] rm distributed_strategy
上级 47306c58
...@@ -96,7 +96,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -96,7 +96,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
return strategy return strategy
def _build_trainer_programs(self, compiled_config): def _build_trainer_programs(self, compiled_config):
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import (
trainer_pass as worker, trainer_pass as worker,
) )
...@@ -106,7 +106,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -106,7 +106,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
use_ps_gpu = self.user_defined_strategy.a_sync_configs["use_ps_gpu"] use_ps_gpu = self.user_defined_strategy.a_sync_configs["use_ps_gpu"]
if not compiled_config.is_geo_mode(): if not compiled_config.is_geo_mode():
from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( from paddle.incubate.fleet.parameter_server.ir.public import (
_add_lr_decay_table_pass, _add_lr_decay_table_pass,
) )
...@@ -150,7 +150,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -150,7 +150,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
compiled_config.set_origin_ps_startup_program(_startup) compiled_config.set_origin_ps_startup_program(_startup)
# for heter program # for heter program
if self.role_maker._is_heter_parameter_server_mode: if self.role_maker._is_heter_parameter_server_mode:
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import (
heter_trainer_pass as heter_worker, heter_trainer_pass as heter_worker,
) )
...@@ -191,13 +191,13 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -191,13 +191,13 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_main = paddle.static.Program() _main = paddle.static.Program()
_startup = paddle.static.Program() _startup = paddle.static.Program()
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import (
pserver_pass as server, pserver_pass as server,
) )
if not compiled_config.is_geo_mode(): if not compiled_config.is_geo_mode():
from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( from paddle.incubate.fleet.parameter_server.ir.public import (
_get_optimize_ops, _get_optimize_ops,
) )
...@@ -209,7 +209,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -209,7 +209,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
if len(ops) == 0: if len(ops) == 0:
return _main, _startup return _main, _startup
from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( from paddle.incubate.fleet.parameter_server.ir.public import (
_add_lr_decay_table_pass, _add_lr_decay_table_pass,
) )
...@@ -299,9 +299,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -299,9 +299,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
free = get_sys_free_mem() free = get_sys_free_mem()
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import vars_metatools
vars_metatools,
)
processed_var_names = set(["@EMPTY@"]) processed_var_names = set(["@EMPTY@"])
param_memory_size = 0 param_memory_size = 0
...@@ -371,9 +369,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -371,9 +369,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_origin_main_program = loss.block.program _origin_main_program = loss.block.program
_origin_startup_program = startup_program _origin_startup_program = startup_program
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import public as public
public as public,
)
compiled_config = public.CompileTimeStrategy( compiled_config = public.CompileTimeStrategy(
_origin_main_program, _origin_main_program,
...@@ -409,14 +405,14 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -409,14 +405,14 @@ class ParameterServerOptimizer(MetaOptimizerBase):
} }
else: else:
loss.block.program = main_program loss.block.program = main_program
fluid.framework.switch_startup_program(startup_program) paddle.framework.switch_startup_program(startup_program)
elif self.role_maker._is_server(): elif self.role_maker._is_server():
main_program, startup_program = self._build_pserver_programs( main_program, startup_program = self._build_pserver_programs(
compiled_config compiled_config
) )
loss.block.program = main_program loss.block.program = main_program
fluid.framework.switch_startup_program(startup_program) paddle.framework.switch_startup_program(startup_program)
return None, None return None, None
def _disable_strategy(self, dist_strategy): def _disable_strategy(self, dist_strategy):
......
...@@ -123,7 +123,7 @@ class Hogwild(DeviceWorker): ...@@ -123,7 +123,7 @@ class Hogwild(DeviceWorker):
hogwild.stat_var_names.extend([i]) hogwild.stat_var_names.extend([i])
downpour.stat_var_names.extend([i]) downpour.stat_var_names.extend([i])
from paddle.fluid.incubate.fleet.parameter_server import version from paddle.incubate.fleet.parameter_server import version
if ( if (
version.is_transpiler() version.is_transpiler()
...@@ -271,7 +271,7 @@ class DownpourLite(DeviceWorker): ...@@ -271,7 +271,7 @@ class DownpourLite(DeviceWorker):
for i in opt_info["stat_var_names"]: for i in opt_info["stat_var_names"]:
downpour.stat_var_names.extend([i]) downpour.stat_var_names.extend([i])
from paddle.fluid.incubate.fleet.parameter_server import version from paddle.incubate.fleet.parameter_server import version
if ( if (
version.is_transpiler() version.is_transpiler()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Convert the static program to distributed data-parallelism programs.
"""
import os
import sys
import warnings
import paddle
from paddle.framework import core
from paddle.static import (
default_main_program,
default_startup_program,
Program,
Executor,
)
from paddle.fluid.compiler import CompiledProgram
from paddle.fluid.parallel_executor import ParallelExecutor
from paddle.fluid.optimizer import Optimizer
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
)
from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.parameter_server import version
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
_has_global_step,
)
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
TrainerRuntimeConfig,
DistributedStrategy,
SyncStrategy,
AsyncStrategy,
HalfAsyncStrategy,
GeoStrategy,
StrategyFactory,
)
from paddle.fluid.incubate.fleet.parameter_server.mode import PSMode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.parameter_server.ir import (
trainer_pass as worker,
)
from paddle.fluid.incubate.fleet.parameter_server.ir import (
pserver_pass as server,
)
from paddle.fluid.incubate.fleet.parameter_server.ir import public as public
class FleetTranspiler(Fleet):
"""
A subclass for compatibility with fluid.transpiler.DistributeTranspiler.
"""
def __init__(self):
super().__init__(Mode.TRANSPILER)
self._inner_mode = None
if version.is_transpiler():
self._inner_mode = PSMode.TRANSPILER
else:
self._inner_mode = PSMode.PSLIB
self._strategy = None
self._transpiler = None
self._origin_main_program = None
self._origin_startup_program = None
self._communicator = None
self.startup_program = None
self.main_program = None
self._opt_info = None
self._local_ip = 0
self._fleet_ptr = None
self._main_programs = []
self._scopes = []
self._client2client_request_timeout_ms = 500000
self._client2client_connect_timeout_ms = 10000
self._client2client_max_retry = 3
def init(self, role_maker=None):
if role_maker is None:
role_maker = MPISymetricRoleMaker()
super().init(role_maker)
if self._fleet_ptr is None:
self._fleet_ptr = core.Fleet()
def _init_transpiler_worker(self):
"""
`init_worker` has many many functions to do before training,
first, wait for all parameter servers launch completely.
second, run executor to initialize startup program
third, wait for all worker initialize completely.
Returns:
None
"""
def sync_strategy_envs():
kwargs = {}
kwargs[
"pserver_endpoints"
] = self._role_maker.get_pserver_endpoints()
kwargs["trainer_id"] = self._role_maker.worker_index()
return kwargs
def geo_strategy_envs():
def get_sparse_attrs():
opt_init_map = {}
opt_init_map["gaussian_random"] = ["seed", "mean", "std"]
opt_init_map["fill_constant"] = ["value"]
opt_init_map["uniform_random"] = ["seed", "min", "max"]
opt_init_map["truncated_gaussian_random"] = [
"seed",
"mean",
"std",
]
dist_varnames = get_sparse_tablenames(
self._origin_main_program, True
)
sparse_varnames = get_sparse_tablenames(
self._origin_main_program, False
)
if len(dist_varnames) != 0:
raise ValueError(
"GeoStrategy can not support large scale embeding now, please use fluid.layers.embedding"
)
init_attrs = []
for value_name in sparse_varnames:
value_var = self._origin_main_program.global_block().vars[
value_name
]
value_attr = [
value_name,
",".join([str(dim) for dim in value_var.shape]),
]
for op in self._origin_startup_program.global_block().ops:
if (
op.type in opt_init_map.keys()
and value_name == op.output("Out")[0]
):
init_attr = [op.type]
for attr in opt_init_map[op.type]:
init_attr.append(str(op.attr(attr)))
value_attr.append("&".join(init_attr))
init_attrs.append(":".join(value_attr))
break
return "#".join(init_attrs)
kwargs = {}
kwargs["trainers"] = self.worker_num()
kwargs["sparse_attrs"] = get_sparse_attrs()
return kwargs
# if MPISymetricRoleMaker is defined
# we suppose a user wants to submit job on mpi cluster
if isinstance(self._role_maker, MPISymetricRoleMaker):
# check whether server has been initialized
wait_server_ready(self.server_endpoints(to_string=False))
trainer_config = self._strategy.get_trainer_runtime_config()
print(trainer_config)
lrs = _has_global_step(_get_lr_ops(self._origin_main_program))
if lrs > 0:
kwargs = {"need_global_step": "1"}
else:
kwargs = {"need_global_step": "0"}
if isinstance(self._strategy, GeoStrategy):
geo_kwargs = geo_strategy_envs()
kwargs.update(geo_kwargs)
if isinstance(self._strategy, SyncStrategy):
sync_kwargs = sync_strategy_envs()
kwargs.update(sync_kwargs)
kwargs = kwargs if kwargs else None
send_ctx = fleet.compiled_config.get_communicator_send_context()
if self.compiled_config.is_geo_mode():
recv_ctx = fleet.compiled_config.get_communicator_recv_context(
recv_type=4
)
else:
recv_ctx = fleet.compiled_config.get_communicator_recv_context(
recv_type=1
)
from paddle.distributed.communicator import Communicator
self._communicator = Communicator(
trainer_config.mode, kwargs, trainer_config.get_communicator_flags()
)
self._communicator.init_with_ctx(send_ctx, recv_ctx)
if not self._communicator.is_running():
self._communicator.start()
else:
raise ValueError(
"Communicator can only be inited once, please check"
)
def init_worker(self):
"""
`init_worker` has many many functions to do before training,
first, wait for all parameter servers launch completely.
second, run executor to initialize startup program
third, wait for all worker initialize completely.
Returns:
None
"""
if self._inner_mode == PSMode.TRANSPILER:
self._init_transpiler_worker()
else:
raise NotImplementedError("add implement later")
def _init_transpiler_server(self, model_dir=None):
if not self.startup_program:
raise ValueError(
"startup_program is None, need invoke DistributedOptimizer.minimize first"
)
self._executor.run(self.startup_program)
if model_dir:
if not os.path.isdir(model_dir):
raise ValueError("There is no directory named '%s'", model_dir)
sparse_varnames = self.compiled_config.get_sparse_varname_on_ps(
True
)
distribtued_varnames = (
self.compiled_config.get_sparse_varname_on_ps(False)
)
remaining_vars = list(
filter(
FleetTranspiler.__exclude_vars(
sparse_varnames + distribtued_varnames
),
self.main_program.list_vars(),
)
)
paddle.static.load_vars(
self._executor,
main_program=self.main_program,
dirname=model_dir,
vars=remaining_vars,
)
self._load_sparse_params(
dirname=model_dir, varnames=sparse_varnames
)
# todo(tangwei12) load distributed vars
# self._load_sparse_params(dirname=model_dir, varnames=distribtued_varnames)
def init_server(self, model_dir=None, **kwargs):
"""
`init_server` has many many functions to do before start pserver,
first, run executor to initialize startup program,
second, if the `model_dir` is not empty, it will load parameters from it for increment training.
Args:
model_dir(str): The directory path.
Returns:
None
"""
if self._inner_mode == PSMode.TRANSPILER:
self._init_transpiler_server(model_dir)
else:
raise NotImplementedError("add implement later")
def run_server(self):
"""
`run_server` execute executor to start pserver main program.
Returns:
None
"""
if self._inner_mode == PSMode.TRANSPILER:
if not self.main_program:
raise ValueError(
"main_program is None, need invoke DistributedOptimizer.minimize first"
)
self._executor.run(self.main_program)
else:
raise NotImplementedError("add implement later")
def stop_worker(self):
"""
Close this executor.
For the distributed training, this method would free the resource on PServers related to
the current Trainer.
Returns:
None
"""
if self._inner_mode == PSMode.TRANSPILER:
self._communicator.stop()
if isinstance(self._role_maker, MPISymetricRoleMaker):
self._role_maker._finalize()
self._executor.close()
else:
raise NotImplementedError("add implement later")
def distributed_optimizer(self, optimizer, strategy=None):
"""
Optimizer for distributed training.
For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
Which has basic Optimizer function and special features for distributed training.
Args:
optimizer(Optimizer): The executor to run for init server.
strategy(DistributeTranspilerConfig): Extra properties for distributed optimizer.
Returns:
TranspilerOptimizer: subclass of DistributedOptimizer.
"""
if not isinstance(optimizer, Optimizer):
raise ValueError("optimizer must be an instance of Optimizer")
if not self._is_initialized:
raise ValueError(
"fleet.init(role) to initialize before optimizer.minimize(loss)"
)
if not strategy:
_strategy = StrategyFactory.create_async_strategy()
if isinstance(strategy, DistributedStrategy):
_strategy = strategy
elif isinstance(strategy, DistributeTranspilerConfig):
if strategy.sync_mode:
_strategy = SyncStrategy()
else:
if strategy.runtime_split_send_recv:
if strategy.geo_sgd_mode:
_strategy = GeoStrategy(strategy.geo_sgd_need_push_nums)
elif strategy.half_async:
_strategy = HalfAsyncStrategy()
else:
_strategy = AsyncStrategy()
else:
_strategy = HalfAsyncStrategy()
# for half_async compatibility
strategy.half_async = True
strategy.runtime_split_send_recv = True
_strategy.set_program_config(strategy)
elif isinstance(strategy, dict):
if self._inner_mode != PSMode.PSLIB:
raise TypeError("Dict strategy can only be used at PSLIB Mode")
_strategy = StrategyFactory.create_async_strategy()
_strategy.set_pslib_runtime_config(strategy)
else:
raise TypeError(
"strategy must be an instance of DistributeTranspilerConfig, DistributedStrategy"
)
self._strategy = _strategy
self._optimizer = ParameterServerOptimizer(optimizer, _strategy)
return self._optimizer
def save_inference_model(
self,
executor,
dirname,
feeded_var_names,
target_vars,
main_program=None,
export_for_deployment=True,
):
"""
Prune the given `main_program` to build a new program especially for inference,
and then save it and all related parameters to given `dirname` by the `executor`.
"""
if self._inner_mode == PSMode.PSLIB:
raise NotImplementedError("add implement later")
if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed"
)
if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type"
)
# Todo(MrChengmo): support recv&save GPU-Kernel for ps-gpu model save
if not isinstance(executor.place, paddle.CPUPlace):
save_executor = Executor(paddle.CPUPlace())
else:
save_executor = executor
if main_program is not None:
if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed"
)
paddle.static.save_inference_model(
dirname,
feeded_var_names,
target_vars,
executor,
main_program,
None,
None,
export_for_deployment,
)
else:
paddle.static.save_inference_model(
dirname,
feeded_var_names,
target_vars,
executor,
self._origin_main_program,
None,
None,
export_for_deployment,
True,
)
model_basename = "__model__"
model_filename = os.path.join(dirname, model_basename)
with open(model_filename, "rb") as f:
program_desc_str = f.read()
program = Program.parse_from_string(program_desc_str)
program._copy_dist_param_info_from(self.main_program)
self.save_persistables(executor, dirname, program)
def _load_sparse_params(self, dirname, varnames):
from paddle.distributed.communicator import LargeScaleKV
scale_kv = LargeScaleKV()
for varname in varnames:
origin_varname, _, _ = public._get_varname_parts(varname)
sparse_dir = os.path.join(dirname, origin_varname, varname)
scale_kv.load(varname, sparse_dir)
def _get_optimizer_status(self, op, param_name):
supported_opts = [
"sgd",
"adam",
"adagrad",
"adamax",
"momentum",
"lars_momentum",
"rmsprop",
"decayed_adagrad",
"ftrl",
]
reshaped_val_map = {}
reshaped_val_map["sgd"] = []
reshaped_val_map["adam"] = ["moment1_0", "moment2_0"]
reshaped_val_map["adagrad"] = ["moment_0"]
reshaped_val_map["adamax"] = ["moment_0", "inf_norm_0"]
reshaped_val_map["momentum"] = ["velocity_0"]
reshaped_val_map["lars_momentum"] = ["velocity_0"]
reshaped_val_map["rmsprop"] = [
"momentum_0",
"mean_square_0",
"mean_grad_0",
]
reshaped_val_map["decayed_adagrad"] = ["moment_0"]
reshaped_val_map["ftrl"] = ["squared_0", "linear_0"]
orishaped_val_map = {}
orishaped_val_map["adam"] = ["beta1_pow_acc_0", "beta2_pow_acc_0"]
orishaped_val_map["adamax"] = ["beta1_pow_acc_0"]
if op not in supported_opts:
raise ValueError(
"fleet can not support optimizer: {}, only this can be supported: {}".format(
op, supported_opts
)
)
reshaped_names = [
param_name + "_" + val for val in reshaped_val_map[op]
]
if op not in orishaped_val_map:
origin_names = []
else:
origin_names = [
param_name + "_" + val for val in orishaped_val_map[op]
]
return reshaped_names, origin_names
def _get_optimizer_op(self, param_name):
opts = public._get_optimize_ops(self._origin_main_program)
for op in opts:
if (
"Param" in op.input_names
and "LearningRate" in op.input_names
and op.input("Param")[0] == param_name
):
return op
def _save_dense_params(self, executor, dirname, context, main_program):
self._communicator.recv()
prog = Program()
block = prog.global_block()
local_vars = []
for name, var_ctx in context.items():
if len(var_ctx.origin_varnames()) != 1:
raise ValueError("Dense can not support split now.")
varname = var_ctx.origin_varnames()[0]
local_vars.append(varname)
optimizer = self._get_optimizer_op(varname)
reshaped_varnames, origin_varnames = self._get_optimizer_status(
optimizer.type, varname
)
for var_name in [varname] + reshaped_varnames + origin_varnames:
var = self._origin_main_program.global_block().vars[var_name]
block.append_op(
type='recv_save',
attrs={
"trainer_id": self._role_maker.worker_index(),
"shape": var.shape,
"slice_shapes": [",".join([str(i) for i in var.shape])],
"slice_varnames": [var.name],
"remote_varnames": [var.name],
"is_sparse": False,
"endpoints": var_ctx.split_endpoints(),
"file_path": os.path.join(dirname, var.name),
},
)
executor.run(prog)
return local_vars
def _save_sparse_params(self, executor, dirname, context, main_program):
prog = Program()
block = prog.global_block()
local_vars = []
for name, var_ctx in context.items():
if len(var_ctx.origin_varnames()) != 1:
raise ValueError("Dense can not support split now.")
varname = var_ctx.origin_varnames()[0]
local_vars.append(varname)
optimizer = self._get_optimizer_op(varname)
reshaped_varnames, origin_varnames = self._get_optimizer_status(
optimizer.type, varname
)
var = self._origin_main_program.global_block().vars[varname]
slice_shapes = []
dims1 = ",".join([str(i) for i in var.shape[1:]])
for section in var_ctx.sections():
slice_shapes.append(str(section) + dims1)
block.append_op(
type='recv_save',
attrs={
"trainer_id": self._role_maker.worker_index(),
"shape": var.shape,
"slice_shapes": slice_shapes,
"slice_varnames": var_ctx.split_varnames(),
"remote_varnames": var_ctx.split_varnames(),
"is_sparse": True,
"endpoints": var_ctx.split_endpoints(),
"pserver_num": len(
self._role_maker.get_pserver_endpoints()
),
"file_path": os.path.join(dirname, var.name),
},
)
for reshaped_varname in reshaped_varnames:
var = self._origin_main_program.global_block().vars[
reshaped_varname
]
slice_varnames = []
remote_varnames = []
for i in range(len(var_ctx.split_varnames())):
slice_varnames.append(
"{}.block{}".format(reshaped_varname, i)
)
remote_varnames.append(reshaped_varname)
block.append_op(
type='recv_save',
attrs={
"trainer_id": self._role_maker.worker_index(),
"shape": var.shape,
"slice_shapes": slice_shapes,
"slice_varnames": slice_varnames,
"remote_varnames": remote_varnames,
"is_sparse": True,
"endpoints": var_ctx.split_endpoints(),
"pserver_num": len(
self._role_maker.get_pserver_endpoints()
),
"file_path": os.path.join(dirname, var.name),
},
)
for origin_varname in origin_varnames:
var = self._origin_main_program.global_block().vars[
origin_varname
]
block.append_op(
type='recv_save',
attrs={
"trainer_id": self._role_maker.worker_index(),
"shape": var.shape,
"slice_shapes": [",".join([str(i) for i in var.shape])],
"slice_varnames": [origin_varname],
"remote_varnames": [origin_varname],
"is_sparse": False,
"endpoints": var_ctx.split_endpoints()[:1],
"file_path": os.path.join(dirname, var.name),
},
)
executor.run(prog)
return context.keys()
def _save_distributed_params(
self, executor, dirname, context, main_program
):
prog = Program()
block = prog.global_block()
for name, var_ctx in context.items():
block.append_op(
type='checkpoint_notify',
attrs={
"varname": name,
"is_slice": True,
"slice_varnames": var_ctx.split_varnames(),
"remote_varnames": var_ctx.split_varnames(),
"endpoints": var_ctx.split_endpoints(),
"dirname": dirname,
},
)
executor.run(prog)
return context.keys()
def _save_distributed_persistables(self, executor, dirname, main_program):
dense_ctx = fleet.compiled_config.get_communicator_recv_context(
recv_type=1
)
sparse_ctx = fleet.compiled_config.get_communicator_recv_context(
recv_type=2
)
distributed_ctx = fleet.compiled_config.get_communicator_recv_context(
recv_type=3
)
recv_dense_varnames = self._save_dense_params(
executor, dirname, dense_ctx, main_program
)
recv_sparse_varnames = self._save_sparse_params(
executor, dirname, sparse_ctx, main_program
)
recv_distributed_varnames = self._save_distributed_params(
executor, dirname, distributed_ctx, main_program
)
saved_varnames = (
recv_dense_varnames
+ list(recv_sparse_varnames)
+ list(recv_distributed_varnames)
)
remaining_vars = list(
filter(
FleetTranspiler.__exclude_vars(saved_varnames),
main_program.list_vars(),
)
)
paddle.static.save_vars(
executor,
main_program=main_program,
dirname=dirname,
vars=remaining_vars,
)
def save_persistables(self, executor, dirname, main_program=None, **kwargs):
"""
This function filters out all variables with `persistable==True` from the
give `main_program` and then saves these variables to the folder `dirname`
or file `filename`.
The `dirname` is used to specify the folder where persistable variables
are going to be saved. If you would like to save variables in separate
files, set `filename` None;
if you would like to save all variables in a
single file, use `filename` to specify the file name.
"""
if self._inner_mode == PSMode.PSLIB:
raise NotImplementedError("add implement later")
if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed"
)
if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type"
)
# Todo(MrChengmo): support recv&save GPU-Kernel for ps-gpu model save
if not isinstance(executor.place, paddle.CPUPlace):
save_executor = Executor(paddle.CPUPlace())
else:
save_executor = executor
if main_program is None:
main_program = self.main_program
if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed"
)
self._save_distributed_persistables(
save_executor, dirname, main_program
)
@staticmethod
def __exclude_vars(exclude_var_names=[]):
def is_valid(var):
if var.name in exclude_var_names:
return False
origin_varname, _, _ = public._get_varname_parts(var.name)
if origin_varname.endswith("@GRAD"):
return False
if origin_varname == "learning_rate_0":
return False
if (
var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH
or var.desc.type() == core.VarDesc.VarType.FETCH_LIST
or var.desc.type() == core.VarDesc.VarType.READER
):
return False
return var.persistable
return is_valid
# fleet is a global instance for parameter server.
fleet = FleetTranspiler()
class ParameterServerOptimizer(DistributedOptimizer):
"""
DistributedOptimizer is a wrapper for paddle.fluid.optimizer
A user should pass a paddle.fluid.optimizer to DistributedOptimizer
minimize() function is implemented.
DistributedOptimizer is the starting point for a user who wants to
run distributed training. The optimized information will be stored in
Fleet() instance who holds the global information about current distributed
training.
Args:
optimizer(Optimizer): subclass of Optimizer.
strategy(DistributeTranspilerConfig): instance of DistributeTranspilerConfig.
Returns:
None
"""
def __init__(self, optimizer, strategy, mode=PSMode.TRANSPILER):
super().__init__(optimizer, strategy)
self._mode = mode
if self._mode == PSMode.PSLIB:
self._optimizer_name = "Distributed%s" % optimizer.type.capitalize()
if optimizer.type != "adam":
print(
"Currently, distributed optimizer only support Adam"
"Will config built-in adam for you."
"We will support more functions in DistributedOptimizer",
sys.stderr,
)
self._optimizer_name = "DistributedAdam"
self._optimizer = globals()[self._optimizer_name](optimizer)
else:
self._optimizer = optimizer
self._window = 1
self.type = "downpour"
self.data_norm_name = [
".batch_size",
".batch_square_sum",
".batch_sum",
".batch_size@GRAD",
".batch_square_sum@GRAD",
".batch_sum@GRAD",
]
def backward(
self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
callbacks=None,
):
raise NotImplementedError()
def apply_gradients(self, params_grads):
raise NotImplementedError()
def _build_trainer_programs(self, compiled_config):
_main = fleet._origin_main_program.clone()
_startup = fleet._origin_startup_program.clone()
if not compiled_config.is_geo_mode():
# for main program
_main = worker.delete_optimizer_pass(_main, compiled_config)
_main = worker.distributed_ops_pass(_main, compiled_config)
_main = worker.append_send_ops_pass(_main, compiled_config)
# for startup program
_startup = worker.fake_init_ops_pass(_startup, compiled_config)
_startup = worker.init_from_server_pass(_startup, compiled_config)
_startup = worker.delet_extra_optimizes_pass(
_startup, compiled_config
)
else:
_main = worker.append_send_ops_pass(_main, compiled_config)
_startup = _startup
return _main, _startup
def _build_pserver_programs(self, compiled_config):
_main = paddle.static.Program()
_startup = paddle.static.Program()
if not compiled_config.is_geo_mode():
_main = server.add_listen_and_serv_pass(_main, compiled_config)
_main = server.add_rpc_global_flags_pass(_main, compiled_config)
_main = server.add_optimizer_pass(_main, compiled_config)
_main = server.large_scale_sparse_pass(
_main, _main, compiled_config, False
)
_startup = server.build_pserver_startup_program_pass(
_startup, _main, compiled_config
)
_startup = server.large_scale_sparse_pass(
_startup, _main, compiled_config, True
)
if not compiled_config.is_sync_mode():
_main = server.delete_unused_in_main_pass(
_main, compiled_config
)
_startup = server.delete_unused_in_startup_pass(
_startup, _main, compiled_config
)
else:
_main = server.add_listen_and_serv_pass(_main, compiled_config)
_main = server.add_rpc_global_flags_pass(_main, compiled_config)
_main = server.add_geo_optimizer_pass(_main, compiled_config)
_main = server.large_scale_sparse_pass(
_main, _main, compiled_config, False
)
_startup = server.build_pserver_startup_program_pass(
_startup, _main, compiled_config
)
_startup = server.large_scale_sparse_pass(
_startup, _main, compiled_config, True
)
_startup = server.delete_unused_in_startup_pass(
_startup, _main, compiled_config
)
return _main, _startup
def minimize(
self,
losses,
scopes=None,
startup_programs=None,
parameter_list=None,
no_grad_set=None,
):
if isinstance(losses, list):
raise ValueError("need implement later")
self._optimizer.minimize(
losses, startup_programs, parameter_list, no_grad_set
)
fleet._origin_main_program = default_main_program().clone(
for_test=False
)
fleet._origin_startup_program = default_startup_program().clone(
for_test=False
)
compiled_config = public.CompileTimeStrategy(
fleet._origin_main_program,
fleet._origin_startup_program,
self._strategy,
fleet._role_maker,
)
fleet.compiled_config = compiled_config
fleet.main_program, fleet.startup_program = (
self._build_trainer_programs(compiled_config)
if fleet.is_worker()
else self._build_pserver_programs(compiled_config)
)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__ = [
"TrainerRuntimeConfig",
"DistributedStrategy",
"SyncStrategy",
"AsyncStrategy",
"HalfAsyncStrategy",
"GeoStrategy",
"StrategyFactory",
]
import os
import paddle.fluid as fluid
from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig,
ServerRuntimeConfig,
)
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
class TrainerRuntimeConfig:
def __init__(self):
self.mode = None
num_threads = os.getenv("CPU_NUM", "1")
self.runtime_configs = {}
self.runtime_configs['communicator_max_merge_var_num'] = os.getenv(
"FLAGS_communicator_max_merge_var_num", num_threads
)
self.runtime_configs['communicator_send_queue_size'] = os.getenv(
"FLAGS_communicator_send_queue_size", num_threads
)
self.runtime_configs[
'communicator_independent_recv_thread'
] = os.getenv("FLAGS_communicator_independent_recv_thread", "1")
self.runtime_configs[
'communicator_min_send_grad_num_before_recv'
] = os.getenv(
"FLAGS_communicator_min_send_grad_num_before_recv", num_threads
)
self.runtime_configs['communicator_thread_pool_size'] = os.getenv(
"FLAGS_communicator_thread_pool_size", "5"
)
self.runtime_configs['communicator_send_wait_times'] = os.getenv(
"FLAGS_communicator_send_wait_times", "5"
)
self.runtime_configs['communicator_is_sgd_optimizer'] = os.getenv(
"FLAGS_communicator_is_sgd_optimizer", "1"
)
# not used
self.runtime_configs['rpc_deadline'] = os.getenv(
"FLAGS_rpc_deadline", "180000"
)
self.runtime_configs['rpc_retry_times'] = os.getenv(
"FLAGS_rpc_retry_times", "3"
)
def get_communicator_flags(self):
need_keys = []
num_threads = os.getenv("CPU_NUM", "1")
mode_str = ""
if self.mode is None or self.mode == DistributedMode.ASYNC:
need_keys = self.runtime_configs.keys()
mode_str = "async"
elif (
self.mode == DistributedMode.SYNC
or self.mode == DistributedMode.HALF_ASYNC
):
mode_str = "sync or half_async"
need_keys = [
'communicator_max_merge_var_num',
'communicator_send_wait_times',
'communicator_thread_pool_size',
'communicator_send_queue_size',
]
elif self.mode == DistributedMode.GEO:
mode_str = "GEO"
need_keys = [
'communicator_thread_pool_size',
'communicator_send_wait_times',
'communicator_max_merge_var_num',
'communicator_send_queue_size',
]
else:
raise ValueError("Unsupported Mode")
if (
self.mode == DistributedMode.SYNC
or self.mode == DistributedMode.HALF_ASYNC
):
max_merge_var_num = self.runtime_configs[
'communicator_max_merge_var_num'
]
send_queue_size = self.runtime_configs[
'communicator_send_queue_size'
]
if max_merge_var_num != num_threads:
print(
'WARNING: In {} mode, communicator_max_merge_var_num '
'must be equal to CPU_NUM. But received, '
'communicator_max_merge_var_num = {}, CPU_NUM = '
'{}. communicator_max_merge_var_num will be fored to {}.'.format(
mode_str, max_merge_var_num, num_threads, num_threads
)
)
self.runtime_configs[
'communicator_max_merge_var_num'
] = num_threads
if send_queue_size != num_threads:
print(
'WARNING: In {} mode, communicator_send_queue_size '
'must be equal to CPU_NUM. But received, '
'communicator_send_queue_size = {}, CPU_NUM = '
'{}. communicator_send_queue_size will be fored to {}.'.format(
mode_str, send_queue_size, num_threads, num_threads
)
)
self.runtime_configs[
'communicator_send_queue_size'
] = num_threads
return dict((key, str(self.runtime_configs[key])) for key in need_keys)
def display(self, configs):
raw0, raw1, length = 45, 5, 50
h_format = "{:^45s}{:<5s}\n"
l_format = "{:<45s}{:<5s}\n"
border = "".join(["="] * length)
line = "".join(["-"] * length)
draws = ""
draws += border + "\n"
draws += h_format.format("TrainerRuntimeConfig Overview", "Value")
draws += line + "\n"
for k, v in configs.items():
draws += l_format.format(k, v)
draws += border
_str = "\n{}\n".format(draws)
return _str
def __repr__(self):
return self.display(self.get_communicator_flags())
class PSLibRuntimeConfig:
def __init__(self):
self.runtime_configs = {}
def get_runtime_configs(self):
return self.runtime_configs
class DistributedStrategy:
def __init__(self):
self._program_config = DistributeTranspilerConfig()
self._trainer_runtime_config = TrainerRuntimeConfig()
self._pslib_runtime_config = PSLibRuntimeConfig()
self._server_runtime_config = ServerRuntimeConfig()
num_threads = int(os.getenv("CPU_NUM", "1"))
self._execute_strategy = fluid.ExecutionStrategy()
self._build_strategy = fluid.BuildStrategy()
self._execute_strategy.num_threads = num_threads
if num_threads > 1:
self._build_strategy.reduce_strategy = (
fluid.BuildStrategy.ReduceStrategy.Reduce
)
self.debug_opt = None
self.use_ps_gpu = False
def set_debug_opt(self, opt_info):
self.debug_opt = opt_info
def get_debug_opt(self):
opt_info = dict()
if self.debug_opt is not None and isinstance(self.debug_opt, dict):
opt_info["dump_slot"] = bool(self.debug_opt.get("dump_slot", 0))
opt_info["dump_converter"] = str(
self.debug_opt.get("dump_converter", "")
)
opt_info["dump_fields"] = self.debug_opt.get("dump_fields", [])
opt_info["dump_file_num"] = self.debug_opt.get("dump_file_num", 16)
opt_info["dump_fields_path"] = self.debug_opt.get(
"dump_fields_path", ""
)
opt_info["dump_param"] = self.debug_opt.get("dump_param", [])
return opt_info
def get_program_config(self):
return self._program_config
def set_program_config(self, config):
if isinstance(config, DistributeTranspilerConfig):
self._program_config = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._program_config, key):
setattr(self._program_config, key, config[key])
else:
raise ValueError(
"DistributeTranspilerConfig doesn't have key: {}".format(
key
)
)
else:
raise TypeError(
"program_config only accept input type: dict or DistributeTranspilerConfig"
)
self.check_program_config()
def check_program_config(self):
raise NotImplementedError(
"check_program_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_trainer_runtime_config(self):
return self._trainer_runtime_config
def set_trainer_runtime_config(self, config):
if isinstance(config, TrainerRuntimeConfig):
self._trainer_runtime_config = config
elif isinstance(config, dict):
for key, Value in config.items():
if key in self._trainer_runtime_config.runtime_configs:
self._trainer_runtime_config.runtime_configs[key] = Value
else:
raise ValueError(
"TrainerRuntimeConfig doesn't have key: {}".format(key)
)
else:
raise TypeError(
"trainer_runtime_config only accept input type: dict or TrainerRuntimeConfig"
)
self.check_trainer_runtime_config()
def check_trainer_runtime_config(self):
raise NotImplementedError(
"check_trainer_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_pslib_runtime_config(self):
return self._pslib_runtime_config
def set_pslib_runtime_config(self, config):
self._pslib_runtime_config.runtime_configs = config
def get_server_runtime_config(self):
return self._server_runtime_config
def set_server_runtime_config(self, config):
if isinstance(config, ServerRuntimeConfig):
self._server_runtime_config = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._server_runtime_config, key):
setattr(self._server_runtime_config, key, config[key])
else:
raise ValueError(
"ServerRuntimeConfig doesn't have key: {}".format(key)
)
else:
raise TypeError(
"server_runtime_config only accept input type: dict or ServerRuntimeConfig"
)
self.check_server_runtime_config()
def check_server_runtime_config(self):
raise NotImplementedError(
"check_server_runtime_config must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_execute_strategy(self):
return self._execute_strategy
def set_execute_strategy(self, config):
if isinstance(config, fluid.ExecutionStrategy):
self._execute_strategy = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._execute_strategy, key):
setattr(self._execute_strategy, key, config[key])
else:
raise ValueError(
"ExecutionStrategy doesn't have key: {}".format(key)
)
else:
raise TypeError(
"execute_strategy only accept input type: dict or ExecutionStrategy"
)
self.check_execute_strategy()
def check_execute_strategy(self):
raise NotImplementedError(
"check_execute_strategy must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
def get_build_strategy(self):
return self._build_strategy
def set_build_strategy(self, config):
if isinstance(config, fluid.BuildStrategy):
self._build_strategy = config
elif isinstance(config, dict):
for key in config:
if hasattr(self._build_strategy, key):
setattr(self._build_strategy, key, config[key])
else:
raise ValueError(
"BuildStrategy doesn't have key: {}".format(key)
)
else:
raise TypeError(
"build_strategy only accept input type: dict or BuildStrategy"
)
self.check_build_strategy()
def check_build_strategy(self):
raise NotImplementedError(
"check_build_strategy must be implemented by derived class. You should use StrategyFactory to create DistributedStrategy."
)
class SyncStrategy(DistributedStrategy):
def __init__(self):
super().__init__()
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.SYNC
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
self._program_config.half_async = True
self._program_config.completely_not_async = True
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
self._execute_strategy.use_thread_barrier = True
def check_build_strategy(self):
self._build_strategy.async_mode = True
class AsyncStrategy(DistributedStrategy):
def __init__(self):
super().__init__()
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.ASYNC
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
pass
def check_build_strategy(self):
self._build_strategy.async_mode = True
class HalfAsyncStrategy(DistributedStrategy):
def __init__(self):
super().__init__()
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.HALF_ASYNC
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
self._program_config.half_async = True
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
self._execute_strategy.use_thread_barrier = True
def check_build_strategy(self):
self._build_strategy.async_mode = True
class GeoStrategy(DistributedStrategy):
def __init__(self, update_frequency=100):
super().__init__()
self._program_config.geo_sgd_need_push_nums = update_frequency
self.check_program_config()
self.check_trainer_runtime_config()
self.check_server_runtime_config()
self.check_build_strategy()
self.check_execute_strategy()
def check_program_config(self):
self._program_config.sync_mode = False
self._program_config.runtime_split_send_recv = True
self._program_config.geo_sgd_mode = True
def check_trainer_runtime_config(self):
self._trainer_runtime_config.mode = DistributedMode.GEO
self._trainer_runtime_config.runtime_configs[
'communicator_send_queue_size'
] = self._program_config.geo_sgd_need_push_nums
self._trainer_runtime_config.runtime_configs[
'communicator_max_merge_var_num'
] = self._program_config.geo_sgd_need_push_nums
def check_server_runtime_config(self):
pass
def check_execute_strategy(self):
pass
def check_build_strategy(self):
self._build_strategy.async_mode = True
class StrategyFactory:
def __init_(self):
pass
@staticmethod
def create_sync_strategy():
return SyncStrategy()
@staticmethod
def create_half_async_strategy():
return HalfAsyncStrategy()
@staticmethod
def create_async_strategy():
return AsyncStrategy()
@staticmethod
def create_geo_strategy(update_frequency=100):
return GeoStrategy(update_frequency)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
import paddle.framework.core as core
import paddle
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
find_heter_ops,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
union_forward_gradient_op,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
create_heter_program,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
create_trainer_program,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
find_block_joints,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
find_op_input_output,
)
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import (
get_vars_name_in_block,
)
def split_heter_worker_ops_pass(program, config, stage_id, device):
"""
split heter worker program from origin-program
1. find heter op (located on different device)
2. find input&output of every heter-block
3. create heter worker program, add listen&serv op
"""
default_deveice = "cpu"
program, heter_ops, _, program_block_ops = find_heter_ops(
program, default_deveice
)
if len(heter_ops) == 0:
warnings.warn(
"Currently running in Heter Parameter Server mode, but no OP running on heterogeneous devices, Please check your code."
)
return program
program_block_ops = union_forward_gradient_op(program_block_ops)
block_vars_detail = find_block_joints(program, program_block_ops, heter_ops)
heter_program = paddle.static.Program()
create_heter_program(
program,
config,
heter_program,
program_block_ops,
heter_ops,
block_vars_detail,
device,
stage_id,
)
return heter_program
def split_trainer_ops_pass(program, config, default_device="cpu"):
"""
split cpu-trainer program from origin-program
1. find heter op (located on different device)
2. find input&output of every heter-block
3. create cpu-trainer program, add send&recv op
"""
# Todo: support user define default_device (MrChengmo)
default_device_ = default_device
program, heter_ops, default_ops, program_block_ops = find_heter_ops(
program, default_device_
)
program_block_ops = union_forward_gradient_op(program_block_ops)
block_vars_detail = find_block_joints(program, program_block_ops, heter_ops)
trainer_program = program.clone()
create_trainer_program(
trainer_program, program, config, program_block_ops, block_vars_detail
)
return trainer_program
...@@ -24,7 +24,7 @@ main_program = default_main_program() ...@@ -24,7 +24,7 @@ main_program = default_main_program()
class TestFleetPS(unittest.TestCase): class TestFleetPS(unittest.TestCase):
def test_version(self): def test_version(self):
from paddle.fluid.incubate.fleet.parameter_server import version from paddle.incubate.fleet.parameter_server import version
transpiler = version.is_transpiler() transpiler = version.is_transpiler()
self.assertEqual(transpiler, True) self.assertEqual(transpiler, True)
......
...@@ -68,6 +68,7 @@ from ..fluid.framework import in_dygraph_mode # noqa: F401 ...@@ -68,6 +68,7 @@ from ..fluid.framework import in_dygraph_mode # noqa: F401
from ..fluid.framework import _global_flags # noqa: F401 from ..fluid.framework import _global_flags # noqa: F401
from ..fluid.framework import _apply_pass # noqa: F401 from ..fluid.framework import _apply_pass # noqa: F401
from ..fluid.framework import switch_main_program from ..fluid.framework import switch_main_program
from ..fluid.framework import switch_startup_program
from ..fluid.framework import _set_expected_place # noqa: F401 from ..fluid.framework import _set_expected_place # noqa: F401
from ..fluid.framework import Block, Program # noqa: F401 from ..fluid.framework import Block, Program # noqa: F401
from ..fluid.framework import IrGraph # noqa: F401 from ..fluid.framework import IrGraph # noqa: F401
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -39,12 +39,12 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Fleet ...@@ -39,12 +39,12 @@ from paddle.fluid.incubate.fleet.base.fleet_base import Fleet
from paddle.fluid.incubate.fleet.base.mode import Mode from paddle.fluid.incubate.fleet.base.mode import Mode
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddle.fluid.incubate.fleet.parameter_server import version from paddle.incubate.fleet.parameter_server import version
from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( from paddle.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames, get_sparse_tablenames,
) )
from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops from paddle.incubate.fleet.parameter_server.ir.public import _get_lr_ops
from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( from paddle.incubate.fleet.parameter_server.ir.public import (
_has_global_step, _has_global_step,
) )
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import ( from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
...@@ -60,16 +60,16 @@ from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_st ...@@ -60,16 +60,16 @@ from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_st
from paddle.distributed.fleet.base.private_helper_function import ( from paddle.distributed.fleet.base.private_helper_function import (
wait_server_ready, wait_server_ready,
) )
from paddle.fluid.incubate.fleet.parameter_server.mode import PSMode from paddle.incubate.fleet.parameter_server.mode import PSMode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import (
trainer_pass as worker, trainer_pass as worker,
) )
from paddle.fluid.incubate.fleet.parameter_server.ir import ( from paddle.incubate.fleet.parameter_server.ir import (
pserver_pass as server, pserver_pass as server,
) )
from paddle.fluid.incubate.fleet.parameter_server.ir import public as public from paddle.incubate.fleet.parameter_server.ir import public as public
class FleetTranspiler(Fleet): class FleetTranspiler(Fleet):
......
...@@ -25,11 +25,11 @@ __all__ = [ ...@@ -25,11 +25,11 @@ __all__ = [
import os import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.fluid.transpiler.distribute_transpiler import ( from paddle.fluid.transpiler.distribute_transpiler import (
DistributeTranspilerConfig, DistributeTranspilerConfig,
ServerRuntimeConfig, ServerRuntimeConfig,
) )
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
class TrainerRuntimeConfig: class TrainerRuntimeConfig:
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import warnings import warnings
import paddle import paddle
from paddle.fluid.incubate.fleet.parameter_server.ir.trainer_pass import ( from paddle.incubate.fleet.parameter_server.ir.trainer_pass import (
create_heter_program, create_heter_program,
create_trainer_program, create_trainer_program,
find_block_joints, find_block_joints,
......
...@@ -26,8 +26,8 @@ from paddle.fluid.incubate.fleet.parameter_server.ir.public import ( ...@@ -26,8 +26,8 @@ from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
_get_optimize_ops, _get_optimize_ops,
get_sparse_tablenames, get_sparse_tablenames,
) )
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.framework import core from paddle.framework import core
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
OP_NAME_SCOPE = "op_namescope" OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "gradient_clip" CLIP_OP_NAME_SCOPE = "gradient_clip"
......
...@@ -293,7 +293,7 @@ os.environ['CUDA_CACHE_MAXSIZE'] = '805306368' ...@@ -293,7 +293,7 @@ os.environ['CUDA_CACHE_MAXSIZE'] = '805306368'
write_cuda_env_config_py(filename='@PADDLE_BINARY_DIR@/python/paddle/cuda_env.py') write_cuda_env_config_py(filename='@PADDLE_BINARY_DIR@/python/paddle/cuda_env.py')
def write_distributed_training_mode_py(filename='paddle/fluid/incubate/fleet/parameter_server/version.py'): def write_distributed_training_mode_py(filename='paddle/incubate/fleet/parameter_server/version.py'):
cnt = ''' cnt = '''
# THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY # THIS FILE IS GENERATED FROM PADDLEPADDLE SETUP.PY
...@@ -320,7 +320,7 @@ def is_transpiler(): ...@@ -320,7 +320,7 @@ def is_transpiler():
'mode': 'PSLIB' if '${WITH_PSLIB}' == 'ON' else 'TRANSPILER' 'mode': 'PSLIB' if '${WITH_PSLIB}' == 'ON' else 'TRANSPILER'
}) })
write_distributed_training_mode_py(filename='@PADDLE_BINARY_DIR@/python/paddle/fluid/incubate/fleet/parameter_server/version.py') write_distributed_training_mode_py(filename='@PADDLE_BINARY_DIR@/python/paddle/incubate/fleet/parameter_server/version.py')
packages=['paddle', packages=['paddle',
...@@ -405,11 +405,10 @@ packages=['paddle', ...@@ -405,11 +405,10 @@ packages=['paddle',
'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.fleet',
'paddle.fluid.incubate.checkpoint', 'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.fleet.base', 'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler',
'paddle.fluid.incubate.fleet.parameter_server.ir',
'paddle.fluid.incubate.fleet.collective', 'paddle.fluid.incubate.fleet.collective',
'paddle.fluid.incubate.fleet.utils', 'paddle.fluid.incubate.fleet.utils',
'paddle.fluid.incubate.fleet.parameter_server.ir',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.amp', 'paddle.amp',
'paddle.cost_model', 'paddle.cost_model',
'paddle.hapi', 'paddle.hapi',
...@@ -437,8 +436,10 @@ packages=['paddle', ...@@ -437,8 +436,10 @@ packages=['paddle',
'paddle.incubate.distributed.models', 'paddle.incubate.distributed.models',
'paddle.incubate.distributed.models.moe', 'paddle.incubate.distributed.models.moe',
'paddle.incubate.distributed.models.moe.gate', 'paddle.incubate.distributed.models.moe.gate',
'paddle.incubate.fleet.parameter_server',
'paddle.incubate.fleet.parameter_server.distribute_transpiler', 'paddle.incubate.fleet.parameter_server.distribute_transpiler',
'paddle.incubate.fleet.parameter_server.pslib', 'paddle.incubate.fleet.parameter_server.pslib',
'paddle.incubate.fleet.parameter_server.ir',
'paddle.quantization', 'paddle.quantization',
'paddle.quantization.quanters', 'paddle.quantization.quanters',
'paddle.quantization.observers', 'paddle.quantization.observers',
......
...@@ -572,7 +572,7 @@ os.environ['CUDA_CACHE_MAXSIZE'] = '805306368' ...@@ -572,7 +572,7 @@ os.environ['CUDA_CACHE_MAXSIZE'] = '805306368'
def write_parameter_server_version_py( def write_parameter_server_version_py(
filename='paddle/fluid/incubate/fleet/parameter_server/version.py', filename='paddle/incubate/fleet/parameter_server/version.py',
): ):
cnt = ''' cnt = '''
...@@ -1298,11 +1298,10 @@ def get_setup_parameters(): ...@@ -1298,11 +1298,10 @@ def get_setup_parameters():
'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.fleet',
'paddle.fluid.incubate.checkpoint', 'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.fleet.base', 'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler',
'paddle.fluid.incubate.fleet.parameter_server.ir',
'paddle.fluid.incubate.fleet.collective', 'paddle.fluid.incubate.fleet.collective',
'paddle.fluid.incubate.fleet.utils', 'paddle.fluid.incubate.fleet.utils',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.fluid.incubate.fleet.parameter_server.ir',
'paddle.amp', 'paddle.amp',
'paddle.cost_model', 'paddle.cost_model',
'paddle.hapi', 'paddle.hapi',
...@@ -1330,7 +1329,9 @@ def get_setup_parameters(): ...@@ -1330,7 +1329,9 @@ def get_setup_parameters():
'paddle.incubate.distributed.models', 'paddle.incubate.distributed.models',
'paddle.incubate.distributed.models.moe', 'paddle.incubate.distributed.models.moe',
'paddle.incubate.distributed.models.moe.gate', 'paddle.incubate.distributed.models.moe.gate',
'paddle.incubate.fleet.parameter_server',
'paddle.incubate.fleet.parameter_server.distribute_transpiler', 'paddle.incubate.fleet.parameter_server.distribute_transpiler',
'paddle.incubate.fleet.parameter_server.ir',
'paddle.incubate.fleet.parameter_server.pslib', 'paddle.incubate.fleet.parameter_server.pslib',
'paddle.quantization', 'paddle.quantization',
'paddle.quantization.quanters', 'paddle.quantization.quanters',
...@@ -1457,7 +1458,7 @@ def main(): ...@@ -1457,7 +1458,7 @@ def main():
filename='{}/python/paddle/cuda_env.py'.format(paddle_binary_dir) filename='{}/python/paddle/cuda_env.py'.format(paddle_binary_dir)
) )
write_parameter_server_version_py( write_parameter_server_version_py(
filename='{}/python/paddle/fluid/incubate/fleet/parameter_server/version.py'.format( filename='{}/python/paddle/incubate/fleet/parameter_server/version.py'.format(
paddle_binary_dir paddle_binary_dir
) )
) )
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册