From 4356f186b4a3015ea1a2877e60f1d8a05fe5312d Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Wed, 6 Feb 2019 11:08:12 +0800 Subject: [PATCH] complete parameter_send --- .../operators/distributed/parameter_send.cc | 42 ++++++----------- .../operators/distributed_ops/send_op.cc | 2 +- .../fluid/tests/unittests/test_dist_base.py | 5 ++ .../fluid/transpiler/distribute_transpiler.py | 47 +++++++++++++------ 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/paddle/fluid/operators/distributed/parameter_send.cc b/paddle/fluid/operators/distributed/parameter_send.cc index efe094fd1f..47ca42c790 100644 --- a/paddle/fluid/operators/distributed/parameter_send.cc +++ b/paddle/fluid/operators/distributed/parameter_send.cc @@ -56,25 +56,13 @@ void ParameterSend::operator()(const std::string &var_name, auto *send_var = scope.FindVar(var_name); size_t out_num = send_varnames.size(); if (send_var->IsType()) { - auto &send_tensor = send_var->Get(); - auto &send_tensor_dims = send_tensor.dims(); - std::vector outs_dims; - outs_dims.reserve(out_num); - - // infer output shape - int num = ctx.Attr("num"); - if (num > 0) { - int64_t in_axis_dim = send_tensor_dims[0]; - PADDLE_ENFORCE_EQ(in_axis_dim % num, 0, - "tensor split does not result" - " in an equal division"); - size_t out_axis_dim = in_axis_dim / num; - for (size_t i = 0; i < out_num; ++i) { - auto dim = send_tensor_dims; - dim[0] = out_axis_dim; - outs_dims.push_back(dim); - } - } else if (height_sections.size() > 0) { + if (out_num > 1) { + auto &send_tensor = send_var->Get(); + auto &send_tensor_dims = send_tensor.dims(); + std::vector outs_dims; + outs_dims.reserve(out_num); + + // infer output shape PADDLE_ENFORCE_EQ(height_sections.size(), out_num, "tensor split sections size" "should be equal to output size."); @@ -83,15 +71,15 @@ void ParameterSend::operator()(const std::string &var_name, dim[0] = height_sections[i]; outs_dims.push_back(dim); } - } - // create output var in local scope - size_t row_offset = 0; - for (auto i = 0; i < out_num; ++i) { - auto *out = - local_scope->Var(send_varnames[i])->GetMutable(); - *out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]); - row_offset += outs_dims[i][0]; + // create output var in local scope + size_t row_offset = 0; + for (auto i = 0; i < out_num; ++i) { + auto *out = + local_scope->Var(send_varnames[i])->GetMutable(); + *out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]); + row_offset += outs_dims[i][0]; + } } } else if (send_var->IsType()) { auto &send_slr = send_var->Get(); diff --git a/paddle/fluid/operators/distributed_ops/send_op.cc b/paddle/fluid/operators/distributed_ops/send_op.cc index e7ccaa83de..0f0ad6b8f9 100644 --- a/paddle/fluid/operators/distributed_ops/send_op.cc +++ b/paddle/fluid/operators/distributed_ops/send_op.cc @@ -42,7 +42,7 @@ class SendOp : public framework::OperatorBase { int sync_send = Attr("sync_mode"); auto send_varnames = Attr>("send_varnames"); - auto height_sections = Attr>("height_sections"); + auto height_sections = Attr>("sections"); if (send_varnames.size() > 0) { PADDLE_ENFORCE_EQ(ins.size(), 1, ""); diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 0968ace62b..758c510dc7 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -48,6 +48,7 @@ class TestDistRunnerBase(object): # NOTE: import fluid until runtime, or else forking processes will cause error. config = fluid.DistributeTranspilerConfig() config.enable_dc_asgd = dc_asgd + config.runtime_split_send_recv = True t = fluid.DistributeTranspiler(config=config) t.transpile( trainer_id=trainer_id, @@ -87,6 +88,9 @@ class TestDistRunnerBase(object): args.endpoints, args.trainers, args.sync_mode, args.dc_asgd) trainer_prog = t.get_trainer_program() + with open("/tmp/trainer." + str(args.trainer_id) + ".proto", + "w") as f: + f.write(str(trainer_prog)) elif args.update_method == "nccl2": # transpile for nccl2 config = fluid.DistributeTranspilerConfig() @@ -115,6 +119,7 @@ class TestDistRunnerBase(object): strategy.allow_op_delay = False build_stra = fluid.BuildStrategy() + build_stra.debug_graphviz_path = "/tmp/graph-" + str(args.trainer_id) if args.use_reduce: build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index a3293afbbd..1b1b416593 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -156,6 +156,8 @@ class DistributeTranspilerConfig(object): mode = "pserver" print_log = False wait_port = True + # split the send recv var in runtime + runtime_split_send_recv = False class DistributeTranspiler(object): @@ -398,8 +400,10 @@ class DistributeTranspiler(object): orig_var = program.global_block().vars[splited_grad_varname] index = find_op_by_output_arg( program.global_block(), splited_grad_varname, reverse=True) - self._insert_split_op(program, orig_var, index, splited_vars) - index += 1 + if not self.config.runtime_split_send_recv: + self._insert_split_op(program, orig_var, index, + splited_vars) + index += 1 else: AssertionError("Can not insert the send op by original " "variable name :", splited_grad_varname) @@ -408,6 +412,17 @@ class DistributeTranspiler(object): name=framework.generate_control_dev_var_name()) self.grad_name_to_send_dummy_out[grad_varname] = dummy_output + if self.config.runtime_split_send_recv: + send_input_vars = [ + program.global_block().vars[splited_grad_varname] + ] + sections = self._get_splited_var_sections(splited_vars) + send_varnames = [var.name for var in splited_vars] + else: + send_input_vars = splited_vars + sections = [] + send_varnames = [] + # get send op_role_var, if not splited, the grad should have .trainer suffix # if splited, grad should be the original grad var name (split_by_ref and send # will be on the same place). ParallelExecutor @@ -415,10 +430,12 @@ class DistributeTranspiler(object): program.global_block()._insert_op( index=index + 1, type="send", - inputs={"X": splited_vars}, + inputs={"X": send_input_vars}, outputs={"Out": dummy_output}, attrs={ "epmap": eplist, + "sections": sections, + "send_varnames": send_varnames, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, OP_ROLE_VAR_ATTR_NAME: [ self.grad_name_to_param_name[grad_varname], @@ -1372,9 +1389,8 @@ class DistributeTranspiler(object): # create table param and grad var in pserver program # create table optimize block in pserver program table_opt_op = [ - op for op in self.optimize_ops - if 'Param' in op.input_names and op.input("Param")[0] == - self.table_name + op for op in self.optimize_ops if 'Param' in op.input_names and + op.input("Param")[0] == self.table_name ][0] origin_param_var = self.origin_program.global_block().vars[ @@ -1548,11 +1564,17 @@ class DistributeTranspiler(object): lod_level=var.lod_level, persistable=persistable) + @staticmethod + def _get_splited_var_sections(splited_vars): + height_sections = [] + for v in splited_vars: + height_sections.append(v.shape[0]) + return height_sections + def _insert_split_op(self, program, orig_var, index, splited_vars): + height_sections = self._get_splited_var_sections(splited_vars) + if orig_var.type == core.VarDesc.VarType.SELECTED_ROWS: - height_sections = [] - for v in splited_vars: - height_sections.append(v.shape[0]) sparse_param_name = self.grad_name_to_param_name[orig_var.name] if self._is_input_of_remote_sparse_update_op(sparse_param_name): self.sparse_param_to_height_sections[ @@ -1567,16 +1589,13 @@ class DistributeTranspiler(object): RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE }) elif orig_var.type == core.VarDesc.VarType.LOD_TENSOR: - sections = [] - for v in splited_vars: - sections.append(v.shape[0]) program.global_block()._insert_op( index=index + 1, type="split_byref", inputs={"X": orig_var}, outputs={"Out": splited_vars}, attrs={ - "sections": sections, + "sections": height_sections, RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE }) else: @@ -2048,7 +2067,7 @@ class DistributeTranspiler(object): Get optimizer operators, parameters and gradients from origin_program Returns: opt_ops (list): optimize operators. - params_grads (dict): paramter->gradient. + params_grads (dict): parameter->gradient. """ block = self.origin_program.global_block() opt_ops = [] -- GitLab