提交 4356f186 编写于 作者: Q Qiao Longfei

complete parameter_send

上级 741b7cfd
......@@ -56,25 +56,13 @@ void ParameterSend<T>::operator()(const std::string &var_name,
auto *send_var = scope.FindVar(var_name);
size_t out_num = send_varnames.size();
if (send_var->IsType<framework::LoDTensor>()) {
auto &send_tensor = send_var->Get<framework::LoDTensor>();
auto &send_tensor_dims = send_tensor.dims();
std::vector<framework::DDim> outs_dims;
outs_dims.reserve(out_num);
// infer output shape
int num = ctx.Attr<int>("num");
if (num > 0) {
int64_t in_axis_dim = send_tensor_dims[0];
PADDLE_ENFORCE_EQ(in_axis_dim % num, 0,
"tensor split does not result"
" in an equal division");
size_t out_axis_dim = in_axis_dim / num;
for (size_t i = 0; i < out_num; ++i) {
auto dim = send_tensor_dims;
dim[0] = out_axis_dim;
outs_dims.push_back(dim);
}
} else if (height_sections.size() > 0) {
if (out_num > 1) {
auto &send_tensor = send_var->Get<framework::LoDTensor>();
auto &send_tensor_dims = send_tensor.dims();
std::vector<framework::DDim> outs_dims;
outs_dims.reserve(out_num);
// infer output shape
PADDLE_ENFORCE_EQ(height_sections.size(), out_num,
"tensor split sections size"
"should be equal to output size.");
......@@ -83,15 +71,15 @@ void ParameterSend<T>::operator()(const std::string &var_name,
dim[0] = height_sections[i];
outs_dims.push_back(dim);
}
}
// create output var in local scope
size_t row_offset = 0;
for (auto i = 0; i < out_num; ++i) {
auto *out =
local_scope->Var(send_varnames[i])->GetMutable<framework::Tensor>();
*out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]);
row_offset += outs_dims[i][0];
// create output var in local scope
size_t row_offset = 0;
for (auto i = 0; i < out_num; ++i) {
auto *out =
local_scope->Var(send_varnames[i])->GetMutable<framework::Tensor>();
*out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]);
row_offset += outs_dims[i][0];
}
}
} else if (send_var->IsType<framework::SelectedRows>()) {
auto &send_slr = send_var->Get<framework::SelectedRows>();
......
......@@ -42,7 +42,7 @@ class SendOp : public framework::OperatorBase {
int sync_send = Attr<int>("sync_mode");
auto send_varnames = Attr<std::vector<std::string>>("send_varnames");
auto height_sections = Attr<std::vector<int64_t>>("height_sections");
auto height_sections = Attr<std::vector<int64_t>>("sections");
if (send_varnames.size() > 0) {
PADDLE_ENFORCE_EQ(ins.size(), 1, "");
......
......@@ -48,6 +48,7 @@ class TestDistRunnerBase(object):
# NOTE: import fluid until runtime, or else forking processes will cause error.
config = fluid.DistributeTranspilerConfig()
config.enable_dc_asgd = dc_asgd
config.runtime_split_send_recv = True
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id=trainer_id,
......@@ -87,6 +88,9 @@ class TestDistRunnerBase(object):
args.endpoints, args.trainers,
args.sync_mode, args.dc_asgd)
trainer_prog = t.get_trainer_program()
with open("/tmp/trainer." + str(args.trainer_id) + ".proto",
"w") as f:
f.write(str(trainer_prog))
elif args.update_method == "nccl2":
# transpile for nccl2
config = fluid.DistributeTranspilerConfig()
......@@ -115,6 +119,7 @@ class TestDistRunnerBase(object):
strategy.allow_op_delay = False
build_stra = fluid.BuildStrategy()
build_stra.debug_graphviz_path = "/tmp/graph-" + str(args.trainer_id)
if args.use_reduce:
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
......
......@@ -156,6 +156,8 @@ class DistributeTranspilerConfig(object):
mode = "pserver"
print_log = False
wait_port = True
# split the send recv var in runtime
runtime_split_send_recv = False
class DistributeTranspiler(object):
......@@ -398,8 +400,10 @@ class DistributeTranspiler(object):
orig_var = program.global_block().vars[splited_grad_varname]
index = find_op_by_output_arg(
program.global_block(), splited_grad_varname, reverse=True)
self._insert_split_op(program, orig_var, index, splited_vars)
index += 1
if not self.config.runtime_split_send_recv:
self._insert_split_op(program, orig_var, index,
splited_vars)
index += 1
else:
AssertionError("Can not insert the send op by original "
"variable name :", splited_grad_varname)
......@@ -408,6 +412,17 @@ class DistributeTranspiler(object):
name=framework.generate_control_dev_var_name())
self.grad_name_to_send_dummy_out[grad_varname] = dummy_output
if self.config.runtime_split_send_recv:
send_input_vars = [
program.global_block().vars[splited_grad_varname]
]
sections = self._get_splited_var_sections(splited_vars)
send_varnames = [var.name for var in splited_vars]
else:
send_input_vars = splited_vars
sections = []
send_varnames = []
# get send op_role_var, if not splited, the grad should have .trainer suffix
# if splited, grad should be the original grad var name (split_by_ref and send
# will be on the same place). ParallelExecutor
......@@ -415,10 +430,12 @@ class DistributeTranspiler(object):
program.global_block()._insert_op(
index=index + 1,
type="send",
inputs={"X": splited_vars},
inputs={"X": send_input_vars},
outputs={"Out": dummy_output},
attrs={
"epmap": eplist,
"sections": sections,
"send_varnames": send_varnames,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE,
OP_ROLE_VAR_ATTR_NAME: [
self.grad_name_to_param_name[grad_varname],
......@@ -1372,9 +1389,8 @@ class DistributeTranspiler(object):
# create table param and grad var in pserver program
# create table optimize block in pserver program
table_opt_op = [
op for op in self.optimize_ops
if 'Param' in op.input_names and op.input("Param")[0] ==
self.table_name
op for op in self.optimize_ops if 'Param' in op.input_names and
op.input("Param")[0] == self.table_name
][0]
origin_param_var = self.origin_program.global_block().vars[
......@@ -1548,11 +1564,17 @@ class DistributeTranspiler(object):
lod_level=var.lod_level,
persistable=persistable)
@staticmethod
def _get_splited_var_sections(splited_vars):
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
return height_sections
def _insert_split_op(self, program, orig_var, index, splited_vars):
height_sections = self._get_splited_var_sections(splited_vars)
if orig_var.type == core.VarDesc.VarType.SELECTED_ROWS:
height_sections = []
for v in splited_vars:
height_sections.append(v.shape[0])
sparse_param_name = self.grad_name_to_param_name[orig_var.name]
if self._is_input_of_remote_sparse_update_op(sparse_param_name):
self.sparse_param_to_height_sections[
......@@ -1567,16 +1589,13 @@ class DistributeTranspiler(object):
RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE
})
elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR:
sections = []
for v in splited_vars:
sections.append(v.shape[0])
program.global_block()._insert_op(
index=index + 1,
type="split_byref",
inputs={"X": orig_var},
outputs={"Out": splited_vars},
attrs={
"sections": sections,
"sections": height_sections,
RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE
})
else:
......@@ -2048,7 +2067,7 @@ class DistributeTranspiler(object):
Get optimizer operators, parameters and gradients from origin_program
Returns:
opt_ops (list): optimize operators.
params_grads (dict): paramter->gradient.
params_grads (dict): parameter->gradient.
"""
block = self.origin_program.global_block()
opt_ops = []
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册