diff --git a/paddle/fluid/framework/mixed_vector.h b/paddle/fluid/framework/mixed_vector.h index b834d4633bf9e3433d5ea4b844f6afe407bfc3ac..837b5fa7f61279fe8d53a91c7a3ebf8703d279c9 100644 --- a/paddle/fluid/framework/mixed_vector.h +++ b/paddle/fluid/framework/mixed_vector.h @@ -110,6 +110,10 @@ class Vector { T* end() { return size() == 0 ? &EmptyDummy() : &this->operator[](size()); } + const T* cbegin() const { return begin(); } + + const T* cend() const { return end(); } + T& front() { return *begin(); } T& back() { @@ -249,9 +253,9 @@ class Vector { bool operator==(const Vector& other) const { if (size() != other.size()) return false; - auto it1 = begin(); - auto it2 = other.begin(); - for (; it1 < end(); ++it1, ++it2) { + auto it1 = cbegin(); + auto it2 = other.cbegin(); + for (; it1 < cend(); ++it1, ++it2) { if (*it1 != *it2) { return false; } diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 3730ae161fdf729272e911500fe7f38f99d05ad2..426dd0dc0e95b7952de1b603a943fc725889685e 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -101,6 +101,9 @@ class ListenAndServOp : public framework::OperatorBase { // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; + // Record received sparse variables, so that + // we could reset those after execute optimize program + std::vector sparse_vars; while (!exit_flag) { // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. @@ -143,6 +146,9 @@ class ListenAndServOp : public framework::OperatorBase { PADDLE_THROW("Can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); + if (var->IsType()) { + sparse_vars.push_back(var); + } } } VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; @@ -156,9 +162,19 @@ class ListenAndServOp : public framework::OperatorBase { } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } + + // Reset the received sparse variables, the sum operator would not + // sum the input sparse variables which rows is empty at the next + // mini-batch. + // TOOD(Yancey1989): move the reset action into an operator, we couldn't + // have any hide logic in the operator. + for (auto &var : sparse_vars) { + var->GetMutable()->mutable_rows()->clear(); + } rpc_service_->SetCond(1); rpc_service_->WaitClientGet(update_param_cnt); grads_counter_.clear(); + sparse_vars.clear(); } // while(true) } diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index a8390aa6596c69f85e3ef736dda9dd99c3fd6dba..b241f738cbf60c0698b869da771807ec7e2c33af 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -24,6 +24,22 @@ limitations under the License. */ namespace paddle { namespace operators { +static bool IsVariableInitialized(const framework::Scope& scope, + const std::string& varname) { + auto* var = scope.FindVar(varname); + PADDLE_ENFORCE_NOT_NULL(var, "Can not find variable '%s' in the send side.", + varname); + if (var->IsType()) { + return var->Get().IsInitialized(); + } else if (var->IsType()) { + return var->Get().value().IsInitialized(); + } else { + PADDLE_THROW( + "Variable type in send side should be in " + "[LodTensor, SelectedRows]"); + } + return false; +} class SendOp : public framework::OperatorBase { public: @@ -51,8 +67,12 @@ class SendOp : public framework::OperatorBase { detail::RPCClient* rpc_client = client_var->GetMutable(); for (size_t i = 0; i < ins.size(); i++) { - VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; - rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); + if (IsVariableInitialized(scope, ins[i])) { + VLOG(3) << "sending " << ins[i] << " to " << epmap[i]; + rpc_client->AsyncSendVariable(epmap[i], ctx, scope, ins[i]); + } else { + VLOG(3) << "don't send no-initialied variable: " << ins[i]; + } } PADDLE_ENFORCE(rpc_client->Wait()); diff --git a/paddle/fluid/operators/split_selected_rows_op.cc b/paddle/fluid/operators/split_selected_rows_op.cc index 113ce2ce109778a355130aaf686261c1f71c0980..c30280f6545ee30afb46395ac0eb5efbba462c56 100644 --- a/paddle/fluid/operators/split_selected_rows_op.cc +++ b/paddle/fluid/operators/split_selected_rows_op.cc @@ -22,7 +22,7 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker { SplitSelectedRowsOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { AddInput("X", "The input SelectedRows."); - AddOutput("Out", "The outputs of input SelectedRows.").AsDuplicable(); + AddOutput("Out", "The outputs of the input SelectedRows.").AsDuplicable(); AddAttr>("height_sections", "Height for each output SelectedRows.") .SetDefault(std::vector({})); @@ -56,27 +56,6 @@ class SplitSelectedRowsOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasInput("X"), "SplitSelectedRowsOp must has input X."); PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitSelectedRowsOp must has output Out."); - - std::vector height_sections = - ctx->Attrs().Get>("height_sections"); - int64_t n = ctx->Outputs("Out").size(); - - std::vector outs_dims; - outs_dims.reserve(n); - - // make output dims - for (int64_t i = 0; i < n; ++i) { - auto dims = ctx->GetInputDim("X"); - if (height_sections.size()) { - PADDLE_ENFORCE_EQ( - height_sections.size(), static_cast(n), - "The size of height section should be the same with height" - " section size."); - dims[0] = height_sections[i]; - } - outs_dims.push_back(dims); - } - ctx->SetOutputsDim("Out", outs_dims); } }; diff --git a/paddle/fluid/operators/split_selected_rows_op.h b/paddle/fluid/operators/split_selected_rows_op.h index 527264bd675520a98b442380e2d1ec259964e92e..af44b09b70095cb891bd6cabceb59daf118ecd22 100644 --- a/paddle/fluid/operators/split_selected_rows_op.h +++ b/paddle/fluid/operators/split_selected_rows_op.h @@ -55,6 +55,7 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { for (size_t i = 0; i < outs_rows_idx.size(); ++i) { auto rows_idx = outs_rows_idx[i]; + outs[i]->set_height(height_sections[i]); if (rows_idx.size() > 0) { auto dims = x->GetCompleteDims(); dims[0] = rows_idx.size(); diff --git a/paddle/fluid/operators/sum_op.h b/paddle/fluid/operators/sum_op.h index 5e1222c6ef723a6321392a5af7fdb558c24df32b..08218b6836e2f10ba220523d6241fb0ee4c0b3f2 100644 --- a/paddle/fluid/operators/sum_op.h +++ b/paddle/fluid/operators/sum_op.h @@ -116,7 +116,9 @@ class SumKernel : public framework::OpKernel { int64_t offset = 0; for (int i = 0; i < N; i++) { auto &sel_row = get_selected_row(i); - + if (!sel_row.value().IsInitialized() || sel_row.rows().size() == 0) { + continue; + } PADDLE_ENFORCE_EQ(out->height(), sel_row.height()); functor(context.template device_context(), sel_row, offset, out); diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index edef2b1b17f51f065f9b2a6c5aeddb23355734e4..e4675e24b178b2f1745c2b38270ac381ebfe6550 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -191,6 +191,7 @@ class DistributeTranspiler: for b in param_blocks: varname, block_id, _ = b.split(":") send_outputs.append(param_var_mapping[varname][int(block_id)]) + # let send_op know which endpoint to send which var to, eplist has the same # order as send_inputs. eplist = split_method(send_inputs, pserver_endpoints) @@ -274,6 +275,7 @@ class DistributeTranspiler: name="%s.block%d" % (varname, i), psersistable=False, dtype=orig_var.dtype, + type=orig_var.type, shape=splited_shape) # flattend splited var var_mapping[varname].append(var) return var_mapping @@ -335,6 +337,7 @@ class DistributeTranspiler: name="%s.trainer_%d" % (var.name, i), psersistable=var.persistable, dtype=var.dtype, + type=var.type, shape=var.shape) var_list.append(var_each) return var_list @@ -561,6 +564,7 @@ class DistributeTranspiler: persistable=True, dtype=v.dtype, shape=v.shape) + # step6 optimize_block = pserver_program.create_block(0) # step 6.1