From caf9a09d7bee946969999130477fb5de2983007b Mon Sep 17 00:00:00 2001 From: Yancey Date: Sun, 11 Feb 2018 15:44:27 +0800 Subject: [PATCH] Merge selected rows with dynamic variable count (#8023) * dynamic send/recv selected rows * update by comment * fix by comment --- paddle/fluid/operators/listen_and_serv_op.cc | 16 +++++++++++++ paddle/fluid/operators/send_op.cc | 24 +++++++++++++++++-- .../fluid/operators/split_selected_rows_op.cc | 23 +----------------- .../fluid/operators/split_selected_rows_op.h | 1 + paddle/fluid/operators/sum_op.h | 4 +++- .../paddle/v2/fluid/distribute_transpiler.py | 4 ++++ 6 files changed, 47 insertions(+), 25 deletions(-) diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 3730ae161fd..426dd0dc0e9 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,6 +101,9 @@ class ListenAndServOp : public framework::OperatorBase { // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; + // Record received sparse variables, so that + // we could reset those after execute optimize program + std::vector sparse_vars; while (!exit_flag) { // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. @@ -143,6 +146,9 @@ class ListenAndServOp : public framework::OperatorBase { PADDLE_THROW("Can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); + if (var->IsType()) { + sparse_vars.push_back(var); + } } } VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; @@ -156,9 +162,19 @@ class ListenAndServOp : public framework::OperatorBase { } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } + + // Reset the received sparse variables, the sum operator would not + // sum the input sparse variables which rows is empty at the next + // mini-batch. + // TOOD(Yancey1989): move the reset action into an operator, we couldn't + // have any hide logic in the operator. + for (auto &var : sparse_vars) { + var->GetMutable()->mutable_rows()->clear(); + } rpc_service_->SetCond(1); rpc_service_->WaitClientGet(update_param_cnt); grads_counter_.clear(); + sparse_vars.clear(); } // while(true) } diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index a8390aa6596..b241f738cbf 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -24,6 +24,22 @@ limitations under the License. */ namespace paddle { namespace operators { +static bool IsVariableInitialized(const framework::Scope& scope, + const std::string& varname) { + auto* var = scope.FindVar(varname); + PADDLE_ENFORCE_NOT_NULL(var, "Can not find variable '%s' in the send side.", + varname); + if (var->IsType()) { + return var->Get().IsInitialized(); + } else if (var->IsType()) { + return var->Get().value().IsInitialized(); + } else { + PADDLE_THROW( + "Variable type in send side should be in " + "[LodTensor, SelectedRows]"); + } + return false; +} class SendOp : public framework::OperatorBase { public: @@ -51,8 +67,12 @@ class SendOp : public framework::OperatorBase { detail::RPCClient* rpc_client = client_var->GetMutable(); for (size_t i = 0; i < ins.size(); i++) { - VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; - rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); + if (IsVariableInitialized(scope, ins[i])) { + VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; + rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); + } else { + VLOG(3) << "don't send no-initialied variable: " << ins[i]; + } } PADDLE_ENFORCE(rpc_client->Wait()); diff --git a/paddle/fluid/operators/split_selected_rows_op.cc b/paddle/fluid/operators/split_selected_rows_op.cc index 113ce2ce109..c30280f6545 100644 --- a/paddle/fluid/operators/split_selected_rows_op.cc +++ b/paddle/fluid/operators/split_selected_rows_op.cc @@ -22,7 +22,7 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker { SplitSelectedRowsOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "The input SelectedRows."); - AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable(); + AddOutput("Out", "The outputs of the input SelectedRows.").AsDuplicable(); AddAttr>("height_sections", "Height for each output SelectedRows.") .SetDefault(std::vector({})); @@ -56,27 +56,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasInput("X"), "SplitSelectedRowsOp must has input X."); PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitSelectedRowsOp must has output Out."); - - std::vector height_sections = - ctx->Attrs().Get>("height_sections"); - int64_t n = ctx->Outputs("Out").size(); - - std::vector outs_dims; - outs_dims.reserve(n); - - // make output dims - for (int64_t i = 0; i < n; ++i) { - auto dims = ctx->GetInputDim("X"); - if (height_sections.size()) { - PADDLE_ENFORCE_EQ( - height_sections.size(), static_cast(n), - "The size of height section should be the same with height" - " section size."); - dims[0] = height_sections[i]; - } - outs_dims.push_back(dims); - } - ctx->SetOutputsDim("Out", outs_dims); } }; diff --git a/paddle/fluid/operators/split_selected_rows_op.h b/paddle/fluid/operators/split_selected_rows_op.h index 527264bd675..af44b09b700 100644 --- a/paddle/fluid/operators/split_selected_rows_op.h +++ b/paddle/fluid/operators/split_selected_rows_op.h @@ -55,6 +55,7 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { for (size_t i = 0; i < outs_rows_idx.size(); ++i) { auto rows_idx = outs_rows_idx[i]; + outs[i]->set_height(height_sections[i]); if (rows_idx.size() > 0) { auto dims = x->GetCompleteDims(); dims[0] = rows_idx.size(); diff --git a/paddle/fluid/operators/sum_op.h b/paddle/fluid/operators/sum_op.h index 5e1222c6ef7..08218b6836e 100644 --- a/paddle/fluid/operators/sum_op.h +++ b/paddle/fluid/operators/sum_op.h @@ -116,7 +116,9 @@ class SumKernel : public framework::OpKernel { int64_t offset = 0; for (int i = 0; i < N; i++) { auto &sel_row = get_selected_row(i); - + if (!sel_row.value().IsInitialized() || sel_row.rows().size() == 0) { + continue; + } PADDLE_ENFORCE_EQ(out->height(), sel_row.height()); functor(context.template device_context(), sel_row, offset, out); diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index edef2b1b17f..e4675e24b17 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -191,6 +191,7 @@ class DistributeTranspiler: for b in param_blocks: varname, block_id, _ = b.split(":") send_outputs.append(param_var_mapping[varname][int(block_id)]) + # let send_op know which endpoint to send which var to, eplist has the same # order as send_inputs. eplist = split_method(send_inputs, pserver_endpoints) @@ -274,6 +275,7 @@ class DistributeTranspiler: name="%s.block%d" % (varname, i), psersistable=False, dtype=orig_var.dtype, + type=orig_var.type, shape=splited_shape) # flattend splited var var_mapping[varname].append(var) return var_mapping @@ -335,6 +337,7 @@ class DistributeTranspiler: name="%s.trainer_%d" % (var.name, i), psersistable=var.persistable, dtype=var.dtype, + type=var.type, shape=var.shape) var_list.append(var_each) return var_list @@ -561,6 +564,7 @@ class DistributeTranspiler: persistable=True, dtype=v.dtype, shape=v.shape) + # step6 optimize_block = pserver_program.create_block(0) # step 6.1 -- GitLab