未验证 提交 c490f1b3 编写于 作者: 武毅 提交者: GitHub

Merge pull request #8049 from typhoonzero/no_counter_on_pserver

Enhancement/transpiler rename grad vars to add trainer id, so RPC call can be retried.
......@@ -42,6 +42,25 @@ bool BlockDesc::HasVar(const std::string &name) const {
return vars_.find(name) != vars_.end();
}
VarDesc *BlockDesc::RenameVar(const std::string &old_name,
const std::string &new_name) {
if (!this->HasVar(old_name)) {
return nullptr;
}
need_update_ = true;
auto *var = this->Var(old_name);
VarDesc *new_var = new VarDesc(*(var->Proto()));
new_var->SetName(new_name);
vars_[new_name].reset(new_var);
// rename inputs and outputs
for (const auto &op : ops_) {
auto *it = op.get();
it->Rename(old_name, new_name);
}
vars_.erase(old_name);
return new_var;
}
VarDesc *BlockDesc::FindVarRecursive(const std::string &name) const {
if (name == kEmptyVarName) return nullptr;
......
......@@ -55,6 +55,8 @@ class BlockDesc {
bool HasVar(const std::string &var_name) const;
VarDesc *RenameVar(const std::string &old_name, const std::string &new_name);
VarDesc *FindVarRecursive(const std::string &name_bytes) const;
VarDesc &FindRecursiveOrCreateVar(const std::string &name_bytes);
......
......@@ -75,13 +75,6 @@ class ListenAndServOp : public framework::OperatorBase {
server_thread_->join();
}
std::string GetGradVarNameForTrainer(const std::string &varname) const {
if (grads_counter_.find(varname) == grads_counter_.end()) {
grads_counter_[varname] = 0;
}
return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++);
}
void RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const override {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
......@@ -91,8 +84,7 @@ class ListenAndServOp : public framework::OperatorBase {
// FIXME(Yancey1989): initialize rpc server with lazy mode.
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList");
auto ins = Inputs("X");
auto fan_in = Attr<int>("Fanin");
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
......@@ -109,40 +101,24 @@ class ListenAndServOp : public framework::OperatorBase {
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
size_t recv_var_cnt = 0;
size_t update_param_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::MessageWithName &v = rpc_service_->Get();
auto grad_var_name = v.first;
if (grad_var_name == LISTEN_TERMINATE_MESSAGE) {
auto recv_var_name = v.first;
if (recv_var_name == LISTEN_TERMINATE_MESSAGE) {
LOG(INFO) << "received terminate message and exit";
exit_flag = true;
break;
} else if (grad_var_name == BATCH_BARRIER_MESSAGE) {
} else if (recv_var_name == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "recv batch barrier message";
batch_barrier++;
continue;
} else {
// receive a variable
VLOG(3) << "received grad: " << recv_var_name;
recv_var_cnt++;
auto it =
std::find(grad_list.begin(), grad_list.end(), grad_var_name);
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
update_param_cnt++;
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;
} else {
VLOG(3) << "received variable: " << grad_var_name
<< " no need to update param";
}
if (fan_in > 1 && !param_var_name.empty()) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.FindVar(grad_var_name);
auto *var = recv_scope.FindVar(recv_var_name);
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << grad_var_name;
LOG(ERROR) << "Can not find server side var: " << recv_var_name;
PADDLE_THROW("Can not find server side var");
}
detail::DeserializeFromMessage(v.second, dev_ctx, var);
......@@ -151,29 +127,26 @@ class ListenAndServOp : public framework::OperatorBase {
}
}
}
VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier.";
if (exit_flag) {
rpc_service_->ShutDown();
}
VLOG(3) << "run optimize graph...";
try {
executor.Run(*program, &recv_scope, block->ID(), /*global_block*/
false /*create_local_scope*/, false /*create_vars*/);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
// Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next
// mini-batch.
// TOOD(Yancey1989): move the reset action into an operator, we couldn't
// TODO(Yancey1989): move the reset action into an operator, we couldn't
// have any hide logic in the operator.
for (auto &var : sparse_vars) {
var->GetMutable<framework::SelectedRows>()->mutable_rows()->clear();
}
rpc_service_->SetCond(1);
rpc_service_->WaitClientGet(update_param_cnt);
grads_counter_.clear();
// FIXME(typhoonzero): use another condition to sync wait clients get.
rpc_service_->WaitClientGet(ins.size());
sparse_vars.clear();
} // while(true)
}
......@@ -181,13 +154,13 @@ class ListenAndServOp : public framework::OperatorBase {
protected:
std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
std::shared_ptr<std::thread> server_thread_;
mutable std::unordered_map<std::string, int> grads_counter_;
};
class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
public:
ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "(Tensor) Variables that server recv.").AsDuplicable();
AddComment(R"DOC(
ListenAndServ operator
......@@ -201,16 +174,7 @@ from send_op and send back variables to recv_op.
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<framework::BlockDesc *>(kOptimizeBlock,
"BlockID to run on server side.");
AddAttr<std::vector<std::string>>(
"ParamList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"GradList", "type list of string",
"grad->param name mapping to find which parameters to optimize.")
.SetDefault({});
AddAttr<int>("Fanin", "type int",
"Number of trainers in the current cluster job")
AddAttr<int>("Fanin", "How many clients send to this server.")
.SetDefault(1);
}
};
......
......@@ -170,6 +170,14 @@ void BindBlockDesc(py::module &m) {
[](BlockDesc &self, py::bytes byte_name) {
std::string name = byte_name;
return self.HasVar(name);
},
py::return_value_policy::reference)
.def("rename_var",
[](BlockDesc &self, const py::bytes &byte_name,
const py::bytes &byte_name_new) {
std::string name = byte_name;
std::string new_name = byte_name_new;
self.RenameVar(name, new_name);
})
.def("has_var_recursive",
[](BlockDesc &self, py::bytes byte_name) {
......@@ -198,7 +206,7 @@ void BindVarDsec(py::module &m) {
py::class_<VarDesc> var_desc(m, "VarDesc", "");
var_desc
.def("name",
[](const VarDesc &self) {
[](VarDesc &self) {
py::bytes name = self.Name();
return name;
},
......
......@@ -74,6 +74,8 @@ def download(url, module_name, md5sum, save_name=None):
retry = 0
retry_limit = 3
while not (os.path.exists(filename) and md5file(filename) == md5sum):
if os.path.exists(filename):
print "file md5", md5file(filename), md5sum
if retry < retry_limit:
retry += 1
else:
......
......@@ -14,7 +14,7 @@
from __future__ import print_function
import framework
from framework import Program, default_main_program, Parameter, Variable
from framework import Program, default_main_program, default_startup_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
from distributed_spliter import *
......@@ -133,6 +133,7 @@ class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
trainer_id,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
......@@ -146,13 +147,37 @@ class DistributeTranspiler:
Use different methods to split trainable variables to different
parameter servers.
Steps to transpile trainer:
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
2. rename splited grad variables to add trainer_id suffix ".trainer_%d".
3. modify trainer program add split_op to each grad variable.
4. append send_op to send splited variables to server and fetch
params(splited blocks or origin param) from server.
5. append concat_op to merge splited blocks to update local weights.
Steps to transpile pserver:
1. create new program for parameter server.
2. create params and grad variables that assigned to current server instance.
3. create a sub-block in the server side program
4. append ops that should run on current server instance.
5. add listen_and_serv op
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default is default_main_program
:param params_grads: list of tuple(weight, gradient)
:type params_grads: list
:param trainer_id: one unique id for each trainer in a job.
:type trainer_id: int
:param program: program to transpile, default is default_main_program
:type program: Program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
:param trainers: total number of workers/trainers in the job
:type trainers: int
:param split_method: A function to determin how to split variables
to different servers equally.
:type split_method: function
"""
assert (callable(split_method))
if program is None:
......@@ -160,25 +185,19 @@ class DistributeTranspiler:
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
# steps to transpile:
# 1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
# 2. modify trainer program add split_op to each Grad.
# 3. append send_op to trainer.
# 4. append concat_op to trainer to update local weights.
# 5. create new program for parameter server.
# 6. create parameter server program by split_method generated endpoint->VarBlock
# TODO(typhoonzero): currently trainer_id is fetched from cluster system
# like Kubernetes, we should port this to use etcd later when developing
# fluid distributed training with fault-tolerance.
self.trainer_id = trainer_id
pserver_endpoints = pservers.split(",")
# step1
param_list = [pg[0] for pg in params_grads]
grad_list = [pg[1] for pg in params_grads]
# TODO: add split selected rows support
grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
# step2
grad_var_mapping = self._append_split_op(program, grad_blocks)
# step3
send_inputs = []
send_outputs = []
......@@ -211,7 +230,7 @@ class DistributeTranspiler:
shape=[0])
# create send_op
send_op = program.global_block().append_op(
program.global_block().append_op(
type="send",
inputs={"X": send_inputs},
outputs={"Out": send_outputs,
......@@ -223,14 +242,158 @@ class DistributeTranspiler:
if len(splited_var) <= 1:
continue
orig_param = program.global_block().vars[varname]
concat = program.global_block().append_op(
program.global_block().append_op(
type="concat",
inputs={"X": splited_var},
outputs={"Out": [orig_param]},
attrs={"axis": 0})
def _create_vars_from_blocklist(self, program, block_list):
# Create respective variables using the block_list
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
def get_pserver_program(self, endpoint):
"""
Get pserver side program using the endpoint.
NOTE: assume blocks of the same variable is not distributed
on the same pserver, only change param/grad varnames for
trainers to fetch.
"""
# step1
pserver_program = Program()
# step2
recv_inputs = []
for v in self.param_grad_ep_mapping[endpoint]["params"]:
self._clone_var(pserver_program.global_block(), v)
for v in self.param_grad_ep_mapping[endpoint]["grads"]:
# create vars for each trainer in global scope, so
# we don't need to create them when grad arrives.
# change client side var name to origin name by
# removing ".trainer_%d" suffix
suff_idx = v.name.find(".trainer_")
if suff_idx >= 0:
orig_var_name = v.name[:suff_idx]
pserver_program.global_block().create_var(
name=orig_var_name,
persistable=True,
dtype=v.dtype,
shape=v.shape)
print("create origin var: ", orig_var_name)
for trainer_id in xrange(self.trainers):
var = pserver_program.global_block().create_var(
name="%s.trainer_%d" % (orig_var_name, trainer_id),
persistable=False,
dtype=v.dtype,
shape=v.shape)
recv_inputs.append(var)
print("create per trainer var: ", var.name)
# step3
optimize_block = pserver_program.create_block(0)
# step 4
# Create a union-find data struct from optimize ops,
# If two ops are connected, we could add these two ops
# into one set.
ufind = self._create_ufind(self.optimize_ops)
# step 4.2
# Iterate through the ops and append optimize op which
# located on current pserver
opt_op_on_pserver = []
for _, op in enumerate(self.optimize_ops):
if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op):
opt_op_on_pserver.append(op)
# step 4.3
# Iterate through the ops, and if an op and the optimize ops
# which located on current pserver are in one set, then
# append it into the sub program.
for _, op in enumerate(self.optimize_ops):
for _, opt_op in enumerate(opt_op_on_pserver):
if ufind.is_connected(op, opt_op):
if self._is_opt_op(op):
self._append_pserver_ops(optimize_block, op, endpoint)
else:
self._append_pserver_non_opt_ops(optimize_block, op)
break
# step5 append the listen_and_serv op
pserver_program.global_block().append_op(
type="listen_and_serv",
inputs={'X': recv_inputs},
outputs={},
attrs={
"OptimizeBlock": optimize_block,
"endpoint": endpoint,
"Fanin": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
def get_startup_program(self, endpoint, pserver_program):
"""
Get startup program for current parameter server.
Modify operator input variables if there are variables that
were split to several blocks.
"""
s_prog = Program()
orig_s_prog = framework.default_startup_program()
params = self.param_grad_ep_mapping[endpoint]["params"]
def _get_splited_name_and_shape(varname):
for idx, splited_param in enumerate(params):
pname = splited_param.name
if same_or_split_var(pname, varname) and varname != pname:
return pname, splited_param.shape
return "", []
# 1. create vars in pserver program to startup program
pserver_vars = pserver_program.global_block().vars
created_var_map = dict()
for _, var in pserver_vars.iteritems():
tmpvar = s_prog.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
created_var_map[var.name] = tmpvar
# 2. rename op outputs
for op in orig_s_prog.global_block().ops:
new_inputs = dict()
new_outputs = dict()
# do not append startup op if var is not on this pserver
op_on_pserver = False
for key in op.output_names:
newname, _ = _get_splited_name_and_shape(op.output(key)[0])
if newname:
op_on_pserver = True
new_outputs[key] = created_var_map[newname]
elif op.output(key)[0] in pserver_vars:
op_on_pserver = True
new_outputs[key] = pserver_vars[op.output(key)[0]]
# most startup program ops have no inputs
new_inputs = self._get_input_map_from_op(pserver_vars, op)
if op_on_pserver:
if op.type in [
"gaussian_random", "fill_constant", "uniform_random"
]:
op.attrs["shape"] = new_outputs["Out"].shape
s_prog.global_block().append_op(
type=op.type,
inputs=new_inputs,
outputs=new_outputs,
attrs=op.attrs)
return s_prog
# ====================== private transpiler functions =====================
def _create_vars_from_blocklist(self,
program,
block_list,
add_trainer_suffix=False):
"""
NOTE: only grads need to be named for different trainers, use
add_trainer_suffix to rename the grad vars.
"""
block_map = dict()
var_mapping = dict()
for block_str in block_list:
......@@ -239,11 +402,20 @@ class DistributeTranspiler:
block_map[varname] = []
block_map[varname].append((long(offset), long(size)))
for varname, splited in block_map.iteritems():
orig_var = program.global_block().vars[varname]
var_mapping[varname] = []
orig_var = program.global_block().var(varname)
if len(splited) == 1:
var_mapping[varname] = [orig_var]
if add_trainer_suffix:
new_var_name = "%s.trainer_%d" % \
(orig_var.name, self.trainer_id)
program.global_block().rename_var(varname, new_var_name)
var_mapping[varname] = \
[program.global_block().var(new_var_name)]
else:
var_mapping[varname] = \
[program.global_block().var(orig_var.name)]
continue
var_mapping[varname] = []
orig_shape = orig_var.shape
orig_dim1_flatten = 1
if len(orig_shape) >= 2:
......@@ -255,13 +427,21 @@ class DistributeTranspiler:
splited_shape = [rows]
if len(orig_shape) >= 2:
splited_shape.extend(orig_shape[1:])
new_var_name = ""
if add_trainer_suffix:
new_var_name = "%s.block%d.trainer_%d" % \
(varname, i, self.trainer_id)
else:
new_var_name = "%s.block%d" % \
(varname, i)
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
name=new_var_name,
persistable=False,
dtype=orig_var.dtype,
type=orig_var.type,
shape=splited_shape) # flattend splited var
var_mapping[varname].append(var)
program.global_block().sync_with_cpp()
return var_mapping
def _clone_var(self, block, var):
......@@ -272,13 +452,12 @@ class DistributeTranspiler:
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
# HACK: let all param in pserver be persistable so the child
# program in recv can get them
persistable=True)
def _append_split_op(self, program, gradblocks):
# Split variables that need to be split and append respective ops
var_mapping = self._create_vars_from_blocklist(program, gradblocks)
var_mapping = self._create_vars_from_blocklist(
program, gradblocks, add_trainer_suffix=True)
for varname, splited_vars in var_mapping.iteritems():
# variable that don't need to split have empty splited_vars
if len(splited_vars) <= 1:
......@@ -308,24 +487,6 @@ class DistributeTranspiler:
"[LOD_TENSOR, SELECTED_ROWS]")
return var_mapping
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
def _create_var_for_trainers(self, block, var, trainers):
# For each trainer, create the necessary variables
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
persistable=var.persistable,
dtype=var.dtype,
type=var.type,
shape=var.shape)
var_list.append(var_each)
return var_list
def _get_optimizer_input_shape(self, op_type, varkey, orig_shape,
param_shape):
"""
......@@ -353,6 +514,13 @@ class DistributeTranspiler:
pass
return orig_shape
def _orig_varname(self, varname):
suff_idx = varname.find(".trainer_")
orig_var_name = ""
if suff_idx >= 0:
orig_var_name = varname[:suff_idx]
return orig_var_name
def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
program = optimize_block.program
pserver_block = program.global_block()
......@@ -363,18 +531,23 @@ class DistributeTranspiler:
if key == "Grad":
grad_block = None
for g in self.param_grad_ep_mapping[endpoint]["grads"]:
if same_or_split_var(g.name, opt_op.input(key)[0]):
if same_or_split_var(
self._orig_varname(g.name), opt_op.input(key)[0]):
grad_block = g
break
if not grad_block:
# do not append this op if current endpoint
# is not dealing with this grad block
return
merged_var = pserver_block.vars[grad_block.name]
# append merging ops if trainers > 1
merged_var = \
pserver_block.vars[self._orig_varname(grad_block.name)]
if self.trainers > 1:
vars2merge = self._create_var_for_trainers(
pserver_block, grad_block, self.trainers)
vars2merge = []
for i in xrange(self.trainers):
per_trainer_name = "%s.trainer_%d" % \
(self._orig_varname(grad_block.name), i)
vars2merge.append(pserver_block.vars[per_trainer_name])
optimize_block.append_op(
type="sum",
inputs={"X": vars2merge},
......@@ -517,77 +690,6 @@ class DistributeTranspiler:
return False
return False
def get_pserver_program(self, endpoint):
"""
Get pserver side program using the endpoint
NOTE: assume blocks of the same variable is not distributed
on the same pserver, only change param/grad varnames for
trainers to fetch. For each pserver endpoint, server side
program must be a sub-set of the original optimization program.
"""
# step5
pserver_program = Program()
for v in self.param_grad_ep_mapping[endpoint]["params"]:
self._clone_var(pserver_program.global_block(), v)
for v in self.param_grad_ep_mapping[endpoint]["grads"]:
# create vars for each trainer in global scope, so
# we don't need to create them when grad arrives.
pserver_program.global_block().create_var(
name=v.name, persistable=True, dtype=v.dtype, shape=v.shape)
for trainer_id in xrange(self.trainers):
pserver_program.global_block().create_var(
name="%s.trainer_%d" % (v.name, trainer_id),
persistable=True,
dtype=v.dtype,
shape=v.shape)
# step6
optimize_block = pserver_program.create_block(0)
# step 6.1
# Create a union-find data struct by optimize ops,
# If two ops are connected, we could add these two ops
# into one set.
ufind = self._create_ufind(self.optimize_ops)
# step 6.2
# Iterate through the ops and append optimize op which
# located on current pserver
opt_op_on_pserver = []
for _, op in enumerate(self.optimize_ops):
if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op):
opt_op_on_pserver.append(op)
# step 6.3
# Iterate through the ops, and if an op and the optimize ops
# which located on current pserver are in one set, then
# append it into the sub program.
for _, op in enumerate(self.optimize_ops):
for _, opt_op in enumerate(opt_op_on_pserver):
if ufind.is_connected(op, opt_op):
if self._is_opt_op(op):
self._append_pserver_ops(optimize_block, op, endpoint)
else:
self._append_pserver_non_opt_ops(optimize_block, op)
break
# Append the listen_and_serv op
pserver_program.global_block().append_op(
type="listen_and_serv",
inputs={},
outputs={},
attrs={
"OptimizeBlock": optimize_block,
"endpoint": endpoint,
"ParamList": [
p.name
for p in self.param_grad_ep_mapping[endpoint]["params"]
],
"GradList": [
p.name
for p in self.param_grad_ep_mapping[endpoint]["grads"]
],
"Fanin": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
def _get_input_map_from_op(self, varmap, op):
iomap = dict()
for key in op.input_names:
......@@ -611,61 +713,3 @@ class DistributeTranspiler:
else:
iomap[key] = vars
return iomap
def get_startup_program(self, endpoint, pserver_program):
"""
Get startup program for current parameter server.
Modify operator input variables if there are variables that
were split to several blocks.
"""
s_prog = Program()
orig_s_prog = framework.default_startup_program()
params = self.param_grad_ep_mapping[endpoint]["params"]
def _get_splited_name_and_shape(varname):
for idx, splited_param in enumerate(params):
pname = splited_param.name
if same_or_split_var(pname, varname) and varname != pname:
return pname, splited_param.shape
return "", []
# 1. create vars in pserver program to startup program
pserver_vars = pserver_program.global_block().vars
created_var_map = dict()
for _, var in pserver_vars.iteritems():
tmpvar = s_prog.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
created_var_map[var.name] = tmpvar
# 2. rename op outputs
for op in orig_s_prog.global_block().ops:
new_inputs = dict()
new_outputs = dict()
# do not append startup op if var is not on this pserver
op_on_pserver = False
for key in op.output_names:
newname, _ = _get_splited_name_and_shape(op.output(key)[0])
if newname:
op_on_pserver = True
new_outputs[key] = created_var_map[newname]
elif op.output(key)[0] in pserver_vars:
op_on_pserver = True
new_outputs[key] = pserver_vars[op.output(key)[0]]
# most startup program ops have no inputs
new_inputs = self._get_input_map_from_op(pserver_vars, op)
if op_on_pserver:
if op.type in [
"gaussian_random", "fill_constant", "uniform_random"
]:
op.attrs["shape"] = new_outputs["Out"].shape
s_prog.global_block().append_op(
type=op.type,
inputs=new_inputs,
outputs=new_outputs,
attrs=op.attrs)
return s_prog
......@@ -286,6 +286,10 @@ class Variable(object):
def name(self):
return self.desc.name()
@name.setter
def name(self, new_name):
self.desc.set_name(new_name)
@property
def shape(self):
# convert to tuple, make it as same as numpy API.
......@@ -531,6 +535,12 @@ class Operator(object):
"""
return self.desc.input(name)
def rename_input(self, old_name, new_name):
self.desc.rename_input(old_name, new_name)
def rename_output(self, old_name, new_name):
self.desc.rename_output(old_name, new_name)
@property
def input_names(self):
"""
......@@ -540,6 +550,14 @@ class Operator(object):
"""
return self.desc.input_names()
@property
def input_arg_names(self):
return self.desc.input_arg_names()
@property
def output_arg_names(self):
return self.desc.output_arg_names()
def output(self, name):
"""
Get output arguments by the output parameter name
......@@ -717,6 +735,60 @@ class Block(object):
def has_var(self, name):
return name in self.vars
def rename_var(self, name, new_name):
"""
Rename variable in vars and ops' inputs and outputs
"""
if not self.has_var(name):
raise ValueError("var %s is not in current" % name)
v = self.var(name)
stop_gradient = None
trainable = None
optimize_attr = None
regularizer = None
gradient_clip_attr = None
error_clip = None
if type(v) == Parameter:
stop_gradient = v.stop_gradient
trainable = v.trainable
optimize_attr = v.optimize_attr
regularizer = v.regularizer
gradient_clip_attr = v.gradient_clip_attr
error_clip = v.error_clip
elif type(v) == Variable:
error_clip = v.error_clip
stop_gradient = v.stop_gradient
else:
raise ValueError("unsupported var type: %s", type(v))
self.desc.rename_var(name, new_name)
d = self.desc.find_var(new_name)
var = None
if type(v) == Parameter:
var = Parameter(
self,
d.shape(),
d.dtype(),
name=new_name,
stop_gradient=stop_gradient,
trainable=trainable,
optimize_attr=optimize_attr,
regularizer=regularizer,
gradient_clip_attr=gradient_clip_attr,
error_clip=error_clip)
elif type(v) == Variable:
var = Variable(
self,
name=new_name,
error_clip=error_clip,
stop_gradient=stop_gradient)
# rename the python side, sync_with_cpp will only add
# new vars/ops to python side.
self.vars[new_name] = var
del self.vars[name]
self.sync_with_cpp()
def create_parameter(self, *args, **kwargs):
global_block = self.program.global_block()
param = Parameter(global_block, *args, **kwargs)
......
......@@ -58,14 +58,19 @@ trainers = int(os.getenv("TRAINERS")) # total trainer count
current_endpoint = os.getenv("SERVER_ENDPOINT") # current pserver endpoint
training_role = os.getenv("TRAINING_ROLE",
"TRAINER") # get the training role: trainer/pserver
if not current_endpoint:
print("need env SERVER_ENDPOINT")
exit(1)
t = fluid.DistributeTranspiler()
t.transpile(
optimize_ops, params_grads, pservers=pserver_endpoints, trainers=trainers)
optimize_ops,
params_grads,
0,
pservers=pserver_endpoints,
trainers=trainers)
if training_role == "PSERVER":
if not current_endpoint:
print("need env SERVER_ENDPOINT")
exit(1)
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint, pserver_prog)
exe.run(pserver_startup)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册