未验证 提交 1882a74f 编写于 作者: W wangzhen38 提交者: GitHub

[RM FLUID] rm ps ir (#50691)

上级 e84fa263
......@@ -16,8 +16,7 @@ import os
import warnings
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
from paddle.framework import core
from paddle.static import (
CompiledProgram,
Executor,
......@@ -73,7 +72,7 @@ class ParameterServerRuntime(RuntimeBase):
return strategy
def build_compiled_startegy(self):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
CompileTimeStrategy,
)
......@@ -102,7 +101,7 @@ class ParameterServerRuntime(RuntimeBase):
if main_program is None:
main_program = self.origin_main_program
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_varname_parts,
)
......@@ -111,7 +110,7 @@ class ParameterServerRuntime(RuntimeBase):
origin_varname, _, _ = _get_varname_parts(each_var.name)
new_var = fluid.io._clone_var_in_block_(load_block, each_var)
new_var = paddle.static.io._clone_var_in_block(load_block, each_var)
var_path = os.path.join(dirname, origin_varname)
if not os.path.exists(var_path):
raise ValueError(
......@@ -138,7 +137,7 @@ class ParameterServerRuntime(RuntimeBase):
def _load_distributed_params(self, dirname, varnames):
from paddle.distributed.communicator import LargeScaleKV
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_varname_parts,
)
......@@ -154,7 +153,7 @@ class ParameterServerRuntime(RuntimeBase):
if var.name in exclude_var_names:
return False
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_varname_parts,
)
......@@ -185,7 +184,7 @@ class ParameterServerRuntime(RuntimeBase):
return kwargs
def geo_strategy_envs():
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames,
)
......@@ -239,14 +238,14 @@ class ParameterServerRuntime(RuntimeBase):
kwargs["sparse_attrs"] = get_sparse_attrs()
return kwargs
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
_get_lr_ops,
_has_global_step,
)
from paddle.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
GeoStrategy,
SyncStrategy,
)
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_lr_ops,
_has_global_step,
)
trainer_config = self.async_strategy.get_trainer_runtime_config()
print(trainer_config)
......@@ -475,7 +474,7 @@ class ParameterServerRuntime(RuntimeBase):
return reshaped_names, origin_names
def _get_optimizer_op(self, param_name):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_optimize_ops,
)
......
......@@ -36,7 +36,7 @@ PSERVER_SAVE_SUFFIX = ".shard"
def parse_table_class(varname, o_main_program):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
is_distributed_sparse_op,
is_sparse_op,
)
......@@ -247,7 +247,7 @@ class CommonAccessor:
self.opt_init_map = opt_init_map
def parse_entry(self, varname, o_main_program):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
is_distributed_sparse_op,
is_sparse_op,
)
......@@ -304,7 +304,7 @@ class CommonAccessor:
compiled_strategy,
adam_d2sum,
):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_optimize_ops,
)
......@@ -716,7 +716,7 @@ class TheOnePSRuntime(RuntimeBase):
return strategy
def build_compiled_startegy(self):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
CompileTimeStrategy,
)
......@@ -1191,7 +1191,7 @@ class TheOnePSRuntime(RuntimeBase):
proto_txt, string_hosts, role_id, trainers, self._server_sub_program
)
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames,
)
......@@ -1252,7 +1252,7 @@ class TheOnePSRuntime(RuntimeBase):
if var.name in exclude_var_names:
return False
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_varname_parts,
)
......@@ -1283,7 +1283,7 @@ class TheOnePSRuntime(RuntimeBase):
def _save_sparse_params(
self, executor, dirname, context, main_program, mode
):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames,
)
......@@ -1479,7 +1479,7 @@ class TheOnePSRuntime(RuntimeBase):
self._ps_inference_save_persistables(*args, **kwargs)
def _load_sparse_params(self, dirname, context, main_program, mode):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames,
)
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class PSDispatcher:
"""
PSDispatcher is the base class for dispatching vars
into different pserver instance.
You need to implement the `dispatch` interface.
"""
def __init__(self, pserver_endpoints):
self._eps = pserver_endpoints
self._step = 0
@property
def eps(self):
return self._eps
def reset(self):
"""
reset the step counter, set it zero.
"""
self._step = 0
def dispatch(self, varlist):
"""
Args:
varlist(list): a list of Variables
Returns:
a map of pserver endpoint -> varname
"""
raise NotImplementedError("Interface has not been implemented.")
class HashName(PSDispatcher):
"""
Hash variable names to several endpoints using python
"hash()" function.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super().__init__(pserver_endpoints)
def _hash_block(self, block_str, total):
return hash(block_str) % total
def dispatch(self, varlist):
"""
use `HashName` method to dispatch variables with each parameter server.
Args:
varlist (list): a list of Variables
"""
eplist = []
for var in varlist:
server_id = self._hash_block(var.name(), len(self._eps))
server_for_param = self._eps[server_id]
eplist.append(server_for_param)
return eplist
class RoundRobin(PSDispatcher):
"""
Distribute variables to several endpoints using
RondRobin<https://en.wikipedia.org/wiki/Round-robin_scheduling> method.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super().__init__(pserver_endpoints)
def dispatch(self, varlist):
"""
use `RoundRobin` method to dispatch variables with each parameter server.
Args:
varlist (list): a list of Variables
"""
eplist = []
for var in varlist:
server_for_param = self._eps[self._step]
eplist.append(server_for_param)
self._step += 1
if self._step >= len(self._eps):
self._step = 0
return eplist
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class UnionFind:
"""Union-find data structure.
Union-find is a data structure that keeps track of a set of elements partitioned
into a number of disjoint (non-overlapping) subsets.
Reference:
https://en.wikipedia.org/wiki/Disjoint-set_data_structure
Args:
elements(list): The initialize element list.
"""
def __init__(self, elementes=None):
self._parents = [] # index -> parent index
self._index = {} # element -> index
self._curr_idx = 0
if not elementes:
elementes = []
for ele in elementes:
self._parents.append(self._curr_idx)
self._index.update({ele: self._curr_idx})
self._curr_idx += 1
def find(self, x):
# Find the root index of given element x,
# execute the path compress while findind the root index
if not x in self._index:
return -1
idx = self._index[x]
while idx != self._parents[idx]:
t = self._parents[idx]
self._parents[idx] = self._parents[t]
idx = t
return idx
def union(self, x, y):
# Union two given element
x_root = self.find(x)
y_root = self.find(y)
if x_root == y_root:
return
self._parents[x_root] = y_root
def is_connected(self, x, y):
# If two given elements have the same root index,
# then they are connected.
return self.find(x) == self.find(y)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import reduce
from paddle.framework.io import Variable
from paddle.framework import core
dtype_to_size = {
core.VarDesc.VarType.FP16: 2,
core.VarDesc.VarType.FP32: 4,
core.VarDesc.VarType.FP64: 8,
core.VarDesc.VarType.INT16: 2,
core.VarDesc.VarType.INT32: 4,
core.VarDesc.VarType.INT64: 8,
core.VarDesc.VarType.BOOL: 1,
core.VarDesc.VarType.UINT8: 1,
}
class VarBlock:
def __init__(self, varname, offset, size):
self.varname = varname
# NOTE: real offset is offset * size
self.offset = offset
self.size = size
def __str__(self):
return "%s:%d:%d" % (self.varname, self.offset, self.size)
def create_var_struct(var):
if var.type == core.VarDesc.VarType.SELECTED_ROWS:
lod_level = None
elif var.type == core.VarDesc.VarType.LOD_TENSOR:
lod_level = var.lod_level
else:
raise ValueError("can only support SELECTED_ROWS/LOD_TENSOR now")
return VarStruct(
var.name, var.shape, var.dtype, var.type, lod_level, var.persistable
)
class VarStruct:
"""
record part properties of a Variable in python.
"""
def __init__(self, name, shape, dtype, type, lod_level, persistable):
self.name = name
self.shape = shape
self.dtype = dtype
self.type = type
self.lod_level = lod_level
self.persistable = persistable
self.m_size = 1
self.m_size = reduce(lambda x, y: x * y, shape)
self.m_size *= dtype_to_size[dtype]
def __str__(self):
return "N: {}, S: {}, D: {}, T: {}, LL: {}, P: {}, M: {}".format(
self.name,
self.shape,
self.dtype,
self.type,
self.lod_level,
self.persistable,
self.m_size,
)
class VarDistributed:
"""
a class to record the var distributed on parameter servers.
the class will record the relationship between origin var and slice var.
the slice var's properties, such as type/shape/offset/endpoint.
"""
def __init__(
self,
origin_var,
slice_var,
is_slice=None,
block_id=None,
offset=None,
vtype=None,
endpoint=None,
):
"""
Args:
origin_var(Variable|VarStruct): origin var properties
slice_var(Variable|VarStruct): slice var properties
is_slice(bool|None): slice or not, slice_var=True/False and its block size > 8192 are the judgement standard.
block_id(int|None): the number about the slice var.
offset(int|None): if the slice var is sliced, offset is the numel before the var.
vtype(str|None): a tag, such as Optimizer/Param/RemoteProfetch.
endpoint(str|None): which parameter the slice var on, such as "127.0.0.1:1001"
"""
if isinstance(origin_var, Variable):
self.origin = create_var_struct(origin_var)
else:
self.origin = origin_var
if isinstance(slice_var, Variable):
self.slice = create_var_struct(slice_var)
else:
self.slice = slice_var
if self.equal(self.origin, self.slice):
self.is_slice = False
self.block_id = 0
self.offset = 0
else:
self.is_slice = True
self.block_id = 0
self.offset = 0
if is_slice is not None:
self.is_slice = is_slice
if block_id is not None:
self.block_id = block_id
if offset is not None:
self.offset = offset
self.vtype = vtype
self.endpoint = endpoint
@staticmethod
def equal(var1, var2):
"""
the two var is equal or not.
Returns:
bool: equal will return True else False
"""
assert isinstance(var1, VarStruct) and isinstance(var2, VarStruct)
return (
var1.name == var2.name
and var1.type == var2.type
and var1.shape == var2.shape
and var1.dtype == var2.dtype
and var1.lod_level == var2.lod_level
and var1.persistable == var2.persistable
)
def __str__(self):
origin_var_str = (
"{name} : fluid.{type}.shape{shape}.astype({dtype})".format(
i="{",
e="}",
name=self.origin.name,
type=self.origin.type,
shape=self.origin.shape,
dtype=self.origin.dtype,
)
)
slice_var_str = (
"{name} : fluid.{type}.shape{shape}.astype({dtype})"
".slice({is_slice}).block({block_id}).offset({offset})".format(
i="{",
e="}",
name=self.slice.name,
type=self.slice.type,
shape=self.slice.shape,
dtype=self.slice.dtype,
is_slice=self.is_slice,
block_id=self.block_id,
offset=self.offset,
)
)
return "var owned: {}, origin var: ( {} ), slice var: ( {} ), endpoint: {} ".format(
self.vtype, origin_var_str, slice_var_str, self.endpoint
)
class VarsDistributed:
"""
a gather about VarDistributed with many methods to find distributed vars.
through the class, we can get overview about the distributed parameters on parameter servers.
this class may centralized and convenient for developer to manage and get variable's distribute.
other module can also use this to find variables such io.py.
"""
def __init__(self):
self.distributed_vars = []
def add_distributed_var(
self,
origin_var,
slice_var,
is_slice=None,
block_id=None,
offset=None,
vtype=None,
endpoint=None,
):
"""
add distributed var in this.
Args:
origin_var(Variable|VarStruct): origin var properties
slice_var(Variable|VarStruct): slice var properties
is_slice(bool|None): slice or not, slice_var=True/False and its block size > 8192 are the judgement standard.
block_id(int|None): the number about the slice var.
offset(int|None): if the slice var is sliced, offset is the numel before the var.
vtype(str|None): a tag, such as Optimizer/Param/RemoteProfetch.
endpoint(str|None): which parameter the slice var on, such as "127.0.0.1:1001"
Returns:
None
"""
self.distributed_vars.append(
VarDistributed(
origin_var,
slice_var,
is_slice,
block_id,
offset,
vtype,
endpoint,
)
)
......@@ -444,7 +444,7 @@ class DnnTrainer:
print(
"entering run {} - old".format(str(config["applied_pass_name"]))
)
from paddle.fluid.incubate.fleet.parameter_server.ir import (
from paddle.incubate.fleet.parameter_server.ir import (
public as public,
)
......@@ -458,7 +458,7 @@ class DnnTrainer:
_main = compiled_config.origin_main_program.clone()
_startup = compiled_config.origin_startup_program.clone()
from paddle.fluid.incubate.fleet.parameter_server.ir import (
from paddle.incubate.fleet.parameter_server.ir import (
trainer_pass as worker,
)
......
......@@ -15,7 +15,7 @@
import unittest
from paddle.fluid.framework import default_main_program
from paddle.fluid.incubate.fleet.parameter_server.ir.pserver_pass import (
from paddle.incubate.fleet.parameter_server.ir.pserver_pass import (
_get_optimizer_input_shape,
)
......
......@@ -14,7 +14,7 @@
import unittest
from paddle.fluid.incubate.fleet.parameter_server.ir.ps_dispatcher import (
from paddle.incubate.fleet.parameter_server.ir.ps_dispatcher import (
HashName,
PSDispatcher,
RoundRobin,
......
......@@ -14,7 +14,8 @@
import collections
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.framework import core
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_lr_ops,
_get_optimize_ops,
_get_varname_parts,
......@@ -23,7 +24,6 @@ from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
get_sparse_tablenames,
is_distributed_sparse_op,
)
from paddle.framework import core
LEARNING_RATE_DECAY_COUNTER = "@LR_DECAY_COUNTER@"
OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
......
......@@ -19,12 +19,10 @@ import warnings
from functools import reduce
import paddle
from paddle.fluid.incubate.fleet.parameter_server.ir import vars_metatools
from paddle.fluid.incubate.fleet.parameter_server.ir.ps_dispatcher import (
RoundRobin,
)
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.framework import core
from paddle.incubate.fleet.parameter_server.ir import vars_metatools
from paddle.incubate.fleet.parameter_server.ir.ps_dispatcher import RoundRobin
OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "gradient_clip"
......
......@@ -21,12 +21,12 @@ from functools import reduce
import paddle
import paddle.framework as framework
from paddle.distributed.transpiler.details.program_utils import delete_ops
from paddle.fluid.incubate.fleet.parameter_server.ir.public import (
from paddle.framework import core
from paddle.incubate.fleet.parameter_server.ir.public import (
_get_lr_ops,
_get_optimize_ops,
get_sparse_tablenames,
)
from paddle.framework import core
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
OP_NAME_SCOPE = "op_namescope"
......
......@@ -407,7 +407,6 @@ packages=['paddle',
'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.collective',
'paddle.fluid.incubate.fleet.utils',
'paddle.fluid.incubate.fleet.parameter_server.ir',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.amp',
'paddle.cost_model',
......
......@@ -1301,7 +1301,6 @@ def get_setup_parameters():
'paddle.fluid.incubate.fleet.collective',
'paddle.fluid.incubate.fleet.utils',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.fluid.incubate.fleet.parameter_server.ir',
'paddle.amp',
'paddle.cost_model',
'paddle.hapi',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册