diff --git a/paddle/fluid/framework/attribute.cc b/paddle/fluid/framework/attribute.cc index 0dcecb62dba971b48c4f11c0ef47494be40eeea0..fabf2abfc803b8838edb48aa01ab8896799c97ac 100644 --- a/paddle/fluid/framework/attribute.cc +++ b/paddle/fluid/framework/attribute.cc @@ -64,6 +64,13 @@ Attribute GetAttrValue(const proto::OpDesc::Attr& attr_desc) { case proto::AttrType::LONG: { return attr_desc.l(); } + case proto::AttrType::LONGS: { + std::vector val(attr_desc.longs_size()); + for (int i = 0; i < attr_desc.longs_size(); ++i) { + val[i] = attr_desc.longs(i); + } + return val; + } default: PADDLE_THROW("Unsupport attr type %d", attr_desc.type()); } diff --git a/paddle/fluid/framework/attribute.h b/paddle/fluid/framework/attribute.h index 14ca3e96209ed17f12e87fda8506806514698977..d9c76881b7e98d0b7cd29024b98c8f7720398c66 100644 --- a/paddle/fluid/framework/attribute.h +++ b/paddle/fluid/framework/attribute.h @@ -26,6 +26,113 @@ limitations under the License. */ namespace paddle { namespace framework { + +template +struct ExtractAttribute { + explicit ExtractAttribute(const std::string& attr_name) + : attr_name_(attr_name) {} + + T* operator()(Attribute& attr) const { + T* attr_value = nullptr; + try { + attr_value = &boost::get(attr); + } catch (boost::bad_get& bad_get) { + PADDLE_THROW("Cannot get attribute %s by type %s, its type is %s", + attr_name_, paddle::platform::demangle(typeid(T).name()), + paddle::platform::demangle(attr.type().name())); + } + return attr_value; + } + + const std::string& attr_name_; +}; + +// special handle bool +// FIXME(yuyang18): Currently we cast bool into int in python binding. It is +// hard to change the logic there. In another way, we should correct handle +// if the user set `some_flag=1`. +// +// FIX ME anytime if there is a better solution. +template <> +struct ExtractAttribute { + explicit ExtractAttribute(const std::string& attr_name) + : attr_name_(attr_name) {} + + bool* operator()(Attribute& attr) const { + if (attr.type() == typeid(int)) { // NOLINT + int val = boost::get(attr); + attr = static_cast(val); + } else if (attr.type() == typeid(float)) { // NOLINT + float val = boost::get(attr); + attr = static_cast(val); + } + bool* attr_value = nullptr; + try { + attr_value = &boost::get(attr); + } catch (boost::bad_get& bad_get) { + PADDLE_THROW("Cannot get attribute %s by type bool, its type is %s", + attr_name_, paddle::platform::demangle(attr.type().name())); + } + return attr_value; + } + + const std::string& attr_name_; +}; + +template <> +struct ExtractAttribute { + explicit ExtractAttribute(const std::string& attr_name) + : attr_name_(attr_name) {} + + int64_t* operator()(Attribute& attr) const { + if (attr.type() == typeid(int)) { // NOLINT + int val = boost::get(attr); + attr = static_cast(val); + } else if (attr.type() == typeid(float)) { // NOLINT + int val = boost::get(attr); + attr = static_cast(val); + } + int64_t* attr_value = nullptr; + try { + attr_value = &boost::get(attr); + } catch (boost::bad_get& bad_get) { + PADDLE_THROW("Cannot get attribute %s by type int64_t, its type is %s", + attr_name_, paddle::platform::demangle(attr.type().name())); + } + return attr_value; + } + + const std::string& attr_name_; +}; + +template <> +struct ExtractAttribute> { + explicit ExtractAttribute(const std::string& attr_name) + : attr_name_(attr_name) {} + + std::vector* operator()(Attribute& attr) const { + if (attr.type() == typeid(std::vector)) { // NOLINT + std::vector val = boost::get>(attr); + std::vector vec(val.begin(), val.end()); + attr = vec; + } else if (attr.type() == typeid(std::vector)) { // NOLINT + std::vector val = boost::get>(attr); + std::vector vec(val.begin(), val.end()); + attr = vec; + } + std::vector* attr_value = nullptr; + try { + attr_value = &boost::get>(attr); + } catch (boost::bad_get& bad_get) { + PADDLE_THROW("Cannot get attribute %s by type int64_t, its type is %s", + attr_name_, paddle::platform::demangle(attr.type().name())); + } + return attr_value; + } + + const std::string& attr_name_; +}; + template inline proto::AttrType AttrTypeID() { Attribute tmp = T(); @@ -42,7 +149,11 @@ class AttrReader { inline const T& Get(const std::string& name) const { PADDLE_ENFORCE(attrs_.count(name) != 0, "%s should be in AttributeMap", name); - return boost::get(attrs_.at(name)); + + Attribute& attr = const_cast(attrs_.at(name)); + ExtractAttribute extract_attr(name); + T* attr_value = extract_attr(attr); + return *attr_value; } private: @@ -82,7 +193,7 @@ class DefaultValueSetter { public: explicit DefaultValueSetter(T default_value) : default_value_(default_value) {} - void operator()(T& value) const { value = default_value_; } + void operator()(T& value) const { value = default_value_; } // NOLINT private: T default_value_; @@ -117,84 +228,6 @@ class EnumInContainer { std::unordered_set container_; }; -template -struct ExtractAttribute { - explicit ExtractAttribute(const std::string& attr_name) - : attr_name_(attr_name) {} - - T* operator()(Attribute& attr) const { - T* attr_value = nullptr; - try { - attr_value = &boost::get(attr); - } catch (boost::bad_get& bad_get) { - PADDLE_THROW("Cannot get attribute %s by type %s, its type is %s", - attr_name_, paddle::platform::demangle(typeid(T).name()), - paddle::platform::demangle(attr.type().name())); - } - return attr_value; - } - - const std::string& attr_name_; -}; - -// special handle bool -// FIXME(yuyang18): Currently we cast bool into int in python binding. It is -// hard to change the logic there. In another way, we should correct handle -// if the user set `some_flag=1`. -// -// FIX ME anytime if there is a better solution. -template <> -struct ExtractAttribute { - explicit ExtractAttribute(const std::string& attr_name) - : attr_name_(attr_name) {} - - bool* operator()(Attribute& attr) const { - if (attr.type() == typeid(int)) { // NOLINT - int val = boost::get(attr); - attr = static_cast(val); - } else if (attr.type() == typeid(float)) { // NOLINT - float val = boost::get(attr); - attr = static_cast(val); - } - bool* attr_value = nullptr; - try { - attr_value = &boost::get(attr); - } catch (boost::bad_get& bad_get) { - PADDLE_THROW("Cannot get attribute %s by type bool, its type is %s", - attr_name_, paddle::platform::demangle(attr.type().name())); - } - return attr_value; - } - - const std::string& attr_name_; -}; - -template <> -struct ExtractAttribute { - explicit ExtractAttribute(const std::string& attr_name) - : attr_name_(attr_name) {} - - int64_t* operator()(Attribute& attr) const { - if (attr.type() == typeid(int)) { // NOLINT - int val = boost::get(attr); - attr = static_cast(val); - } else if (attr.type() == typeid(float)) { // NOLINT - int val = boost::get(attr); - attr = static_cast(val); - } - int64_t* attr_value = nullptr; - try { - attr_value = &boost::get(attr); - } catch (boost::bad_get& bad_get) { - PADDLE_THROW("Cannot get attribute %s by type int64_t, its type is %s", - attr_name_, paddle::platform::demangle(attr.type().name())); - } - return attr_value; - } - - const std::string& attr_name_; -}; - // check whether a certain attribute fit its limits // an attribute can have more than one limits template @@ -235,7 +268,7 @@ class TypedAttrChecker { return *this; } - void operator()(AttributeMap& attr_map) const { + void operator()(AttributeMap& attr_map) const { // NOLINT if (!attr_map.count(attr_name_)) { // user do not set this attr PADDLE_ENFORCE(!default_value_setter_.empty(), @@ -271,7 +304,7 @@ class OpAttrChecker { return *(checker.target>()); } - void Check(AttributeMap& attr_map) const { + void Check(AttributeMap& attr_map) const { // NOLINT for (const auto& checker : attr_checkers_) { checker(attr_map); } diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index 4fdab5cd94358d08eac7f8b041bf16d09042f0bd..155441f841729e613fc1d78e30b7ff4ced6c5845 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -52,6 +52,10 @@ void BroadcastOpHandle::RunImpl() { var_scopes.at(in_var_handle->scope_idx_)->FindVar(in_var_handle->name_); PADDLE_ENFORCE_NOT_NULL(in_var); Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var); + if (UNLIKELY(!in_tensor.IsInitialized())) { + VLOG(3) << "in var " << in_var_handle.name_ << "not inited, return!"; + return; + } InitOutputValue(*in_var_handle, out_var_handles); diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index ebd1d644bcce0554904ee0931827ef43a6d52b11..4f3889a71e865ce6f9799d976c6e6ec5c365429b 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -680,7 +680,8 @@ int MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, } if (node->Op()->Type() == "split_byref" || - node->Op()->Type() == "split_selected_rows") { + node->Op()->Type() == "split_selected_rows" || + node->Op()->Type() == "split_ids") { // TODO(paddle-dev): getting the first var is not safe. op_dev_id = GetVarDeviceID(*result, input_var_names[0]); if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index c99406799ba5f664c4b9f80e0567b293e4ffea51..efdabffb9b33ddf007c13008d0f3afb7a3961eda 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -35,6 +35,7 @@ enum AttrType { BLOCK = 8; LONG = 9; BLOCKS = 10; + LONGS = 11; } // OpDesc describes an instance of a C++ framework::OperatorBase @@ -55,6 +56,7 @@ message OpDesc { optional int32 block_idx = 12; optional int64 l = 13; repeated int32 blocks_idx = 14; + repeated int64 longs = 15; }; message Var { diff --git a/paddle/fluid/framework/op_desc.cc b/paddle/fluid/framework/op_desc.cc index c293cf92b4f3d530109c76850df184af9cad7399..8ece618f3f72552fedcffab3e03ebb30476b7cab 100644 --- a/paddle/fluid/framework/op_desc.cc +++ b/paddle/fluid/framework/op_desc.cc @@ -419,8 +419,15 @@ struct SetAttrDescVisitor : public boost::static_visitor { } VectorToRepeated(blocks_idx, attr_->mutable_blocks_idx()); } + void operator()(BlockDesc *desc) const { attr_->set_block_idx(desc->ID()); } + void operator()(int64_t v) const { attr_->set_l(v); } + + void operator()(const std::vector &v) const { + VectorToRepeated(v, attr_->mutable_longs()); + } + void operator()(boost::blank) const { PADDLE_THROW("Unexpected branch"); } }; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 3368ae2ee4cf65b85abf1fcd89dee14f43522e1f..59db33a2edb8222bea5163dc0dfd9b12477cd5ec 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -185,6 +185,10 @@ void ParallelExecutor::BCastParamsToDevices( } auto &main_tensor = main_var->Get(); + if (!main_tensor.IsInitialized()) { + VLOG(3) << "one in var not inited, return!"; + continue; + } auto &dims = main_tensor.dims(); if (paddle::platform::is_gpu_place(main_tensor.place())) { #ifdef PADDLE_WITH_CUDA diff --git a/paddle/fluid/framework/type_defs.h b/paddle/fluid/framework/type_defs.h index e099e40f121ff13657e563eb608feecbca0551be..2de6233a9e0d320ec9a06d547db3575eb61925c0 100644 --- a/paddle/fluid/framework/type_defs.h +++ b/paddle/fluid/framework/type_defs.h @@ -36,7 +36,7 @@ using Attribute = boost::variant, std::vector, std::vector, bool, std::vector, BlockDesc*, int64_t, - std::vector>; + std::vector, std::vector>; using AttributeMap = std::unordered_map; diff --git a/paddle/fluid/operators/fake_init_op.cc b/paddle/fluid/operators/fake_init_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..28ebdcb03ea83f3ec701106111a7cc5f0f7ed7dc --- /dev/null +++ b/paddle/fluid/operators/fake_init_op.cc @@ -0,0 +1,86 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/math/math_function.h" + +namespace paddle { +namespace operators { + +class FakeInitInferShape : public framework::InferShapeBase { + public: + void operator()(framework::InferShapeContext *ctx) const override { + PADDLE_ENFORCE(ctx->HasOutput("Out"), + "Output(Out) of FakeInitOp should not be null."); + auto &shape = ctx->Attrs().Get>("shape"); + ctx->SetOutputDim("Out", framework::make_ddim(shape)); + } +}; + +class FakeInitOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + private: + void RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const override { + framework::Tensor *tensor = nullptr; + + auto &out_var = *scope.FindVar(Output("Out")); + + if (out_var.IsType()) { + tensor = out_var.GetMutable(); + tensor->Resize(framework::make_ddim(Attr>("shape"))); + } else if (out_var.IsType()) { + tensor = out_var.GetMutable()->mutable_value(); + tensor->Resize(framework::make_ddim(Attr>("shape"))); + } else { + PADDLE_THROW( + "fake init op's output only" + "supports SelectedRows and LoDTensor"); + } + } +}; + +class FakeInitOpVarTypeInference : public framework::VarTypeInference { + public: + void operator()(const framework::OpDesc &op_desc, + framework::BlockDesc *block) const override {} +}; + +class FakeInitOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddAttr>("shape", + "(vector) The shape of the output"); + AddOutput("Out", + "(Tensor) Tensor of specified shape will be filled " + "with the specified value"); + AddComment(R"DOC( +FakeInit Operator. + +Init an variable but not alloc memory for it, it is used for init the +table parameter at trainer side in distributed lookup table. + +)DOC"); + } +}; +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OPERATOR(fake_init, ops::FakeInitOp, ops::FakeInitInferShape, + ops::FakeInitOpMaker, paddle::framework::EmptyGradOpMaker, + ops::FakeInitOpVarTypeInference); diff --git a/paddle/fluid/operators/fill_constant_op.cc b/paddle/fluid/operators/fill_constant_op.cc index e04a68717b351ddb0be5a7e70aa9297e5eb0125f..252f313440296bd9e5eebf26f67b08bbe7decce8 100644 --- a/paddle/fluid/operators/fill_constant_op.cc +++ b/paddle/fluid/operators/fill_constant_op.cc @@ -24,7 +24,7 @@ class FillConstantInferShape : public framework::InferShapeBase { void operator()(framework::InferShapeContext *ctx) const override { PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) of FillConstantOp should not be null."); - auto &shape = ctx->Attrs().Get>("shape"); + auto &shape = ctx->Attrs().Get>("shape"); ctx->SetOutputDim("Out", framework::make_ddim(shape)); } }; @@ -47,10 +47,10 @@ class FillConstantOp : public framework::OperatorBase { if (out_var.IsType()) { tensor = out_var.GetMutable(); - tensor->Resize(framework::make_ddim(Attr>("shape"))); + tensor->Resize(framework::make_ddim(Attr>("shape"))); } else if (out_var.IsType()) { tensor = out_var.GetMutable()->mutable_value(); - tensor->Resize(framework::make_ddim(Attr>("shape"))); + tensor->Resize(framework::make_ddim(Attr>("shape"))); } else { PADDLE_THROW( "fill constant op's output only" @@ -83,7 +83,8 @@ class FillConstantOpMaker : public framework::OpProtoAndCheckerMaker { "(int, default 5 (FP32)) " "Output data type") .SetDefault(framework::proto::VarType::FP32); - AddAttr>("shape", "(vector) The shape of the output"); + AddAttr>("shape", + "(vector) The shape of the output"); AddAttr("value", "(float, default 0) The value to be filled") .SetDefault(0.0f); AddAttr("force_cpu", diff --git a/paddle/fluid/operators/gaussian_random_op.cc b/paddle/fluid/operators/gaussian_random_op.cc index 1488aab1926b5b4ba7bceed582700f5a11fc6c93..c70d5b8bc7569c38cbc003aca7c62dc503df11cf 100644 --- a/paddle/fluid/operators/gaussian_random_op.cc +++ b/paddle/fluid/operators/gaussian_random_op.cc @@ -52,7 +52,7 @@ class GaussianRandomOp : public framework::OperatorWithKernel { void InferShape(framework::InferShapeContext* ctx) const override { PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) of GaussianRandomOp should not be null."); - auto shape = ctx->Attrs().Get>("shape"); + auto shape = ctx->Attrs().Get>("shape"); std::vector temp; temp.reserve(shape.size()); for (auto dim : shape) { @@ -88,9 +88,9 @@ class GaussianRandomOpMaker : public framework::OpProtoAndCheckerMaker { void Make() override { AddOutput("Out", "Output matrix of gaussian random op"); - AddAttr>("shape", - "(vector) " - "The dimension of random tensor."); + AddAttr>("shape", + "(vector) " + "The dimension of random tensor."); AddAttr("mean", "(float, default 0.0) " "mean of random tensor.") diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 26f09c46c2224a4a46d302dff4b2ec594f0be103..a038bad701ba8ede3065af9f352f1f21784a50b7 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -27,6 +27,10 @@ limitations under the License. */ #include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/operators/listen_and_serv_op.h" +DEFINE_int32(rpc_send_thread_num, 5, "number of threads for rpc send"); +DEFINE_int32(rpc_get_thread_num, 5, "number of threads for rpc get"); +DEFINE_int32(rpc_prefetch_thread_num, 5, "number of threads for rpc prefetch"); + namespace paddle { namespace operators { @@ -332,11 +336,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, sync_mode, checkpoint_block_id)); rpc_service_->RegisterRPC(distributed::kRequestSend, - request_send_handler_.get()); + request_send_handler_.get(), + FLAGS_rpc_send_thread_num); rpc_service_->RegisterRPC(distributed::kRequestGet, - request_get_handler_.get()); + request_get_handler_.get(), + FLAGS_rpc_get_thread_num); rpc_service_->RegisterRPC(distributed::kRequestPrefetch, - request_prefetch_handler_.get()); + request_prefetch_handler_.get(), + FLAGS_rpc_prefetch_thread_num); rpc_service_->RegisterRPC(distributed::kRequestCheckpoint, request_checkpoint_handler_.get()); diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index d7f6cd5ab0acd2b677a3e5bd51bbcffe82eb1e50..3226a727b1f5f6de9e97ce2068381be7c9b69ff3 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -121,7 +121,7 @@ class LookupTableOpGrad : public framework::OperatorWithKernel { protected: framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext& ctx) const override { - auto data_type = framework::GetDataTypeOfVar(ctx.InputVar("W")); + auto data_type = framework::GetDataTypeOfVar(ctx.InputVar("Out")); return framework::OpKernelType(data_type, ctx.device_context()); } }; diff --git a/paddle/fluid/operators/math/selected_rows_functor.cc b/paddle/fluid/operators/math/selected_rows_functor.cc index 08f57dd45ad76946cbcafb98a3414003ed9d67a9..75946740375d74043960b68e94eb048b3bab4b79 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.cc +++ b/paddle/fluid/operators/math/selected_rows_functor.cc @@ -12,9 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include #include -#include +#include #include "paddle/fluid/operators/math/blas.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" @@ -230,8 +229,24 @@ template struct SelectedRowsAddToTensor; // add or mul. namespace scatter { -size_t FindPos(const std::vector& rows, int64_t value) { - return std::find(rows.begin(), rows.end(), value) - rows.begin(); +template +typename std::enable_if< + std::is_floating_point::value && + std::is_same::value>::type +elementwise_add_to(const DeviceContext& ctx, BlasT* blas, + size_t data_len, const T* in, T* out) { + blas->AXPY(data_len, 1., in, out); +} + +template +typename std::enable_if< + !std::is_floating_point::value && + std::is_same::value>::type +elementwise_add_to(const DeviceContext& ctx, BlasT* blas, + size_t data_len, const T* in, T* out) { + for (int64_t i = 0; i < data_len; i++) { + out[i] += in[i]; + } } template @@ -246,48 +261,84 @@ struct MergeAdd { void operator()(const platform::CPUDeviceContext& context, const framework::SelectedRows& input, framework::SelectedRows* output) { - framework::SelectedRows& out = *output; - std::vector input_rows(input.rows()); + std::vector inputs; + inputs.push_back(&input); + (*this)(context, inputs, output); + } - std::map> merge_row_map; - for (size_t i = 0; i < input_rows.size(); ++i) { - merge_row_map[input_rows[i]].push_back(i); + void operator()(const platform::CPUDeviceContext& context, + const std::vector& inputs, + framework::SelectedRows* output) { + if (inputs.size() == 0) { + VLOG(3) << "no input! return"; + return; } - - std::vector merge_rows(merge_row_map.size()); - size_t idx = 0; - int64_t input_width = input.value().dims()[1]; - out.set_height(input.height()); - - T* out_data = out.mutable_value()->mutable_data( + const framework::SelectedRows* has_value_input = nullptr; + for (auto* in : inputs) { + if (in->rows().size() > 0) { + has_value_input = in; + break; + } + } + if (has_value_input == nullptr) { + VLOG(3) << "no input has value! just return" << std::endl; + return; + } + auto input_width = has_value_input->value().dims()[1]; + auto input_height = has_value_input->height(); + framework::SelectedRows& out = *output; + std::set merged_row_set; + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + PADDLE_ENFORCE_EQ(input_width, input->value().dims()[1], + "all input should have same " + "dimension except for the first one"); + PADDLE_ENFORCE_EQ(input_height, input->height(), + "all input should have same height"); + merged_row_set.insert(input->rows().begin(), input->rows().end()); + } + std::vector merge_rows(merged_row_set.begin(), + merged_row_set.end()); + std::unordered_map rows_to_id; + for (size_t i = 0; i < merge_rows.size(); ++i) { + rows_to_id[merge_rows[i]] = i; + } + out.set_rows(merge_rows); + out.set_height(input_height); + out.mutable_value()->mutable_data( framework::make_ddim( {static_cast(merge_rows.size()), input_width}), context.GetPlace()); - const T* in_data = input.value().data(); - - for (auto& row_pair : merge_row_map) { - auto* out_ptr = out_data + idx * input_width; - auto& rows = row_pair.second; - merge_rows[idx] = row_pair.first; - ++idx; - // rows.size() is always larger than 0 - std::memcpy(out_ptr, in_data + rows[0] * input_width, - sizeof(T) * input_width); - - for (size_t i = 1; i < rows.size(); ++i) { - auto* in_ptr = in_data + rows[i] * input_width; - for (int64_t j = 0; j < input_width; ++j) { - out_ptr[j] += in_ptr[j]; - } + + math::SetConstant constant_functor; + constant_functor(context, out.mutable_value(), 0.0); + + auto* out_data = out.mutable_value()->data(); + + auto blas = math::GetBlas(context); + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + auto* input_data = input->value().data(); + auto& input_rows = input->rows(); + + for (size_t i = 0; i < input_rows.size(); i++) { + size_t out_i = rows_to_id[input_rows[i]]; + elementwise_add_to( + context, &blas, static_cast(input_width), + &input_data[i * input_width], &out_data[out_i * input_width]); } } - - out.set_rows(merge_rows); } }; template struct MergeAdd; template struct MergeAdd; +template struct MergeAdd; +template struct MergeAdd; template struct UpdateToTensor { diff --git a/paddle/fluid/operators/math/selected_rows_functor.cu b/paddle/fluid/operators/math/selected_rows_functor.cu index ba8eccf82042b679f69a32f9d053f05ac8fb9a99..10f39822b9c904ce236a1a2a3806d70693bd2e63 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.cu +++ b/paddle/fluid/operators/math/selected_rows_functor.cu @@ -267,10 +267,15 @@ struct MergeAdd { void operator()(const platform::CUDADeviceContext& context, const framework::SelectedRows& input, framework::SelectedRows* output) { - framework::SelectedRows& out = *output; framework::Vector input_rows(input.rows()); + if (input_rows.size() == 0) { + return; + } + + framework::SelectedRows& out = *output; std::set row_set(input_rows.begin(), input_rows.end()); - std::vector merge_rows(row_set.begin(), row_set.end()); + std::vector merge_rows_cpu(row_set.begin(), row_set.end()); + framework::Vector merge_rows(merge_rows_cpu); auto input_width = input.value().dims()[1]; @@ -296,6 +301,73 @@ struct MergeAdd { out.mutable_rows()->CUDAMutableData(context.GetPlace()), out.rows().size(), input_width); } + + void operator()(const platform::CUDADeviceContext& context, + const std::vector& inputs, + framework::SelectedRows* output) { + if (inputs.size() == 0) { + VLOG(3) << "no input! return"; + return; + } + const framework::SelectedRows* has_value_input = nullptr; + for (auto* in : inputs) { + if (in->rows().size() > 0) { + has_value_input = in; + break; + } + } + if (has_value_input == nullptr) { + VLOG(3) << "no input has value! just return" << std::endl; + return; + } + auto input_width = has_value_input->value().dims()[1]; + auto input_height = has_value_input->height(); + framework::SelectedRows& out = *output; + std::set merged_row_set; + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + PADDLE_ENFORCE_EQ(input_width, input->value().dims()[1], + "all input should have same " + "dimension except for the first one"); + PADDLE_ENFORCE_EQ(input_height, input->height(), + "all input should have same height"); + merged_row_set.insert(input->rows().begin(), input->rows().end()); + } + std::vector merge_rows_cpu(merged_row_set.begin(), + merged_row_set.end()); + framework::Vector merge_rows(merge_rows_cpu); + + out.set_rows(merge_rows); + out.set_height(input_height); + out.mutable_value()->mutable_data( + framework::make_ddim( + {static_cast(merge_rows.size()), input_width}), + context.GetPlace()); + + math::SetConstant constant_functor; + constant_functor(context, out.mutable_value(), 0.0); + + auto* out_data = out.mutable_value()->data(); + + const int block_size = 256; + dim3 threads(block_size, 1); + + for (auto* input : inputs) { + if (input->rows().size() == 0) { + continue; + } + auto* input_data = input->value().data(); + auto& input_rows = input->rows(); + dim3 grid1(input_rows.size(), 1); + + MergeAddKernel<<>>( + input_data, input_rows.CUDAData(context.GetPlace()), out_data, + out.mutable_rows()->CUDAMutableData(context.GetPlace()), + out.rows().size(), input_width); + } + } }; template struct MergeAdd; diff --git a/paddle/fluid/operators/math/selected_rows_functor.h b/paddle/fluid/operators/math/selected_rows_functor.h index 900be86f91c6658a5265189a6745316c6471209e..521c53dd0d71707c13c4364c5ee59943a03d4a2d 100644 --- a/paddle/fluid/operators/math/selected_rows_functor.h +++ b/paddle/fluid/operators/math/selected_rows_functor.h @@ -83,104 +83,9 @@ struct MergeAdd { void operator()(const DeviceContext& context, const framework::SelectedRows& input, framework::SelectedRows* output); -}; - -template <> -struct MergeAdd { - framework::SelectedRows operator()(const platform::CPUDeviceContext& context, - const framework::SelectedRows& input) { - framework::SelectedRows out; - (*this)(context, input, &out); - return out; - } - - void operator()(const platform::CPUDeviceContext& context, - const framework::SelectedRows& input, - framework::SelectedRows* output) { - framework::SelectedRows& out = *output; - std::vector input_rows(input.rows()); - - std::map> merge_row_map; - for (size_t i = 0; i < input_rows.size(); ++i) { - merge_row_map[input_rows[i]].push_back(i); - } - - std::vector merge_rows(merge_row_map.size()); - size_t idx = 0; - int64_t input_width = input.value().dims()[1]; - out.set_height(input.height()); - - auto* out_data = out.mutable_value()->mutable_data( - framework::make_ddim( - {static_cast(merge_rows.size()), input_width}), - context.GetPlace()); - auto* in_data = input.value().data(); - - auto blas = GetBlas(context); - for (auto& row_pair : merge_row_map) { - auto* out_ptr = out_data + idx * input_width; - auto& rows = row_pair.second; - merge_rows[idx] = row_pair.first; - ++idx; - // rows.size() is always larger than 0 - blas.VCOPY(input_width, in_data + rows[0] * input_width, out_ptr); - - for (size_t i = 1; i < rows.size(); ++i) { - blas.AXPY(input_width, 1., in_data + rows[i] * input_width, out_ptr); - } - } - - out.set_rows(merge_rows); - } -}; - -template <> -struct MergeAdd { - framework::SelectedRows operator()(const platform::CPUDeviceContext& context, - const framework::SelectedRows& input) { - framework::SelectedRows out; - (*this)(context, input, &out); - return out; - } - - void operator()(const platform::CPUDeviceContext& context, - const framework::SelectedRows& input, - framework::SelectedRows* output) { - framework::SelectedRows& out = *output; - std::vector input_rows(input.rows()); - - std::map> merge_row_map; - for (size_t i = 0; i < input_rows.size(); ++i) { - merge_row_map[input_rows[i]].push_back(i); - } - - std::vector merge_rows(merge_row_map.size()); - size_t idx = 0; - int64_t input_width = input.value().dims()[1]; - out.set_height(input.height()); - - auto* out_data = out.mutable_value()->mutable_data( - framework::make_ddim( - {static_cast(merge_rows.size()), input_width}), - context.GetPlace()); - auto* in_data = input.value().data(); - - auto blas = GetBlas(context); - for (auto& row_pair : merge_row_map) { - auto* out_ptr = out_data + idx * input_width; - auto& rows = row_pair.second; - merge_rows[idx] = row_pair.first; - ++idx; - // rows.size() is always larger than 0 - blas.VCOPY(input_width, in_data + rows[0] * input_width, out_ptr); - - for (size_t i = 1; i < rows.size(); ++i) { - blas.AXPY(input_width, 1., in_data + rows[i] * input_width, out_ptr); - } - } - - out.set_rows(merge_rows); - } + void operator()(const DeviceContext& context, + const std::vector& inputs, + framework::SelectedRows* output); }; template diff --git a/paddle/fluid/operators/math/selected_rows_functor_test.cc b/paddle/fluid/operators/math/selected_rows_functor_test.cc index 835589356042b44c9fa5988aed726434fd66910a..f15b37a1e3f0ae9c7612c4f74470472393ff4ad6 100644 --- a/paddle/fluid/operators/math/selected_rows_functor_test.cc +++ b/paddle/fluid/operators/math/selected_rows_functor_test.cc @@ -302,6 +302,64 @@ TEST(selected_rows_functor, cpu_merge_add_int) { EXPECT_EQ(out_data[1 * row_numel], 2); EXPECT_EQ(out_data[2 * row_numel], 1); } + +TEST(selected_rows_functor, cpu_merge_add_multi) { + paddle::platform::CPUPlace cpu_place; + paddle::platform::CPUDeviceContext ctx(cpu_place); + paddle::operators::math::SetConstant + set_const; + + int64_t height = 10; + int64_t row_numel = 8; + + std::vector rows1{5, 2, 5, 3, 5}; + std::unique_ptr selected_rows1{ + new paddle::framework::SelectedRows(rows1, height)}; + auto* in1_value = selected_rows1->mutable_value(); + in1_value->mutable_data( + paddle::framework::make_ddim( + {static_cast(rows1.size()), row_numel}), + cpu_place); + set_const(ctx, in1_value, 1.0); + + std::vector rows2{2, 5, 3, 5, 3}; + std::unique_ptr selected_rows2{ + new paddle::framework::SelectedRows(rows2, height)}; + auto* in2_value = selected_rows2->mutable_value(); + in2_value->mutable_data( + paddle::framework::make_ddim( + {static_cast(rows2.size()), row_numel}), + cpu_place); + set_const(ctx, in2_value, 1.0); + + std::unique_ptr output{ + new paddle::framework::SelectedRows()}; + output->set_height(height); + paddle::operators::math::scatter::MergeAdd + merge_add_functor; + + std::vector inputs; + inputs.push_back(selected_rows1.get()); + inputs.push_back(selected_rows2.get()); + merge_add_functor(ctx, inputs, output.get()); + + EXPECT_EQ(output->height(), height); + EXPECT_EQ(output->value().dims(), + paddle::framework::make_ddim({3, row_numel})); + + std::vector ret_rows{2, 3, 5}; + EXPECT_EQ(output->rows(), ret_rows); + + auto* out_data = output->value().data(); + for (size_t i = 0; i < ret_rows.size(); ++i) { + for (size_t j = 0; j < row_numel; ++j) { + EXPECT_EQ(out_data[i * row_numel + j], ret_rows[i]); + } + } +} + TEST(selected_rows_functor, cpu_sum_to) { paddle::platform::CPUPlace cpu_place; paddle::platform::CPUDeviceContext ctx(cpu_place); @@ -318,6 +376,7 @@ TEST(selected_rows_functor, cpu_sum_to) { paddle::framework::make_ddim( {static_cast(rows1.size()), row_numel}), cpu_place); + functor(ctx, in1_value, 1.0); std::vector rows2{0, 5, 7, 9}; std::unique_ptr selected_rows2{ @@ -327,6 +386,7 @@ TEST(selected_rows_functor, cpu_sum_to) { paddle::framework::make_ddim( {static_cast(rows2.size()), row_numel}), cpu_place); + functor(ctx, in2_value, 2.0); std::unique_ptr output{ new paddle::framework::SelectedRows()}; diff --git a/paddle/fluid/operators/math/selected_rows_functor_test.cu b/paddle/fluid/operators/math/selected_rows_functor_test.cu index 5fc50aba25d8e69480a17f0f80877b0d03e17276..17af3e3999ca688c584f636f4c00386f886f9bbf 100644 --- a/paddle/fluid/operators/math/selected_rows_functor_test.cu +++ b/paddle/fluid/operators/math/selected_rows_functor_test.cu @@ -241,3 +241,67 @@ TEST(selected_rows_functor, gpu_add_to) { // row9: 2.0 + 3.0 EXPECT_EQ(tensor1_cpu_data[9 * row_numel + 6], 5.0); } + +TEST(selected_rows_functor, gpu_merge_add) { + paddle::platform::CUDAPlace gpu_place(0); + paddle::platform::CPUPlace cpu_place; + paddle::platform::CUDADeviceContext& ctx = + *reinterpret_cast( + paddle::platform::DeviceContextPool::Instance().Get(gpu_place)); + paddle::operators::math::SetConstant + set_const; + + int64_t height = 10; + int64_t row_numel = 8; + + std::vector rows1{5, 2, 5, 3, 5}; + std::unique_ptr selected_rows1{ + new paddle::framework::SelectedRows(rows1, height)}; + auto* in1_value = selected_rows1->mutable_value(); + in1_value->mutable_data( + paddle::framework::make_ddim( + {static_cast(rows1.size()), row_numel}), + gpu_place); + set_const(ctx, in1_value, 1.0); + + std::vector rows2{2, 5, 3, 5, 3}; + std::unique_ptr selected_rows2{ + new paddle::framework::SelectedRows(rows2, height)}; + auto* in2_value = selected_rows2->mutable_value(); + in2_value->mutable_data( + paddle::framework::make_ddim( + {static_cast(rows2.size()), row_numel}), + gpu_place); + set_const(ctx, in2_value, 1.0); + + std::unique_ptr output{ + new paddle::framework::SelectedRows()}; + output->set_height(height); + paddle::operators::math::scatter::MergeAdd< + paddle::platform::CUDADeviceContext, float> + merge_add_functor; + + std::vector inputs; + inputs.push_back(selected_rows1.get()); + inputs.push_back(selected_rows2.get()); + merge_add_functor(ctx, inputs, output.get()); + + paddle::framework::Tensor output_cpu; + paddle::framework::TensorCopy(output->value(), cpu_place, ctx, &output_cpu); + ctx.Wait(); + + EXPECT_EQ(output->height(), height); + EXPECT_EQ(output->value().dims(), + paddle::framework::make_ddim({3, row_numel})); + + std::vector ret_rows{2, 3, 5}; + EXPECT_EQ(output->rows(), ret_rows); + + auto* out_data = output_cpu.data(); + for (size_t i = 0; i < ret_rows.size(); ++i) { + for (size_t j = 0; j < row_numel; ++j) { + EXPECT_EQ(out_data[i * row_numel + j], ret_rows[i]); + } + } +} diff --git a/paddle/fluid/operators/merge_ids_op.cc b/paddle/fluid/operators/merge_ids_op.cc index c6ec4ab047d5e91625e646fd26108d2e477cdce5..6e0e13698097ade36449f2e8ff6ab981a1b24311 100644 --- a/paddle/fluid/operators/merge_ids_op.cc +++ b/paddle/fluid/operators/merge_ids_op.cc @@ -20,13 +20,16 @@ namespace operators { class MergeIdsOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { - AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}"); - AddInput( - "X", - "(LoDTensors) multi input tensor with shape{batch_num, N}, N is the " - "size of embedding table") + AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}") + .AsDuplicable(); + AddInput("Rows", "(LoDTensor) the input ids with shape{row_size, 1}, ") + .AsDuplicable(); + AddInput("X", + "(LoDTensors) multi input tensor with shape{Rows, N}, N is the " + "size of embedding table") + .AsDuplicable(); + AddOutput("Out", "(LoDTensor) The merged outputs of the input tensors.") .AsDuplicable(); - AddOutput("Out", "(LoDTensor) The merged outputs of the input tensors."); AddComment(R"DOC( Merge multi LoDTensor's into one according to Ids's shard num. @@ -79,15 +82,19 @@ class MergeIdsOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("Ids"), "MergeIdsOp must has input Ids."); - PADDLE_ENFORCE(ctx->HasInputs("X"), "MergeIdsOp must has input X."); - PADDLE_ENFORCE(ctx->HasOutput("Out"), "MergeIdsOp must has output Out."); + PADDLE_ENFORCE(ctx->HasInputs("Ids"), + "MergeIdsOp must has multi input Ids."); + PADDLE_ENFORCE(ctx->HasInputs("Rows"), + "MergeIdsOp must has multi input Rows."); + PADDLE_ENFORCE(ctx->HasInputs("X"), "MergeIdsOp must has multi input X."); + PADDLE_ENFORCE(ctx->HasOutputs("Out"), + "MergeIdsOp must has multi output Out."); auto ids_var_type = ctx->GetInputsVarType("Ids").front(); - auto ids_dims = ctx->GetInputDim("Ids"); + auto ids_dims = ctx->GetInputsDim("Ids"); if (ids_var_type == framework::proto::VarType::LOD_TENSOR) { - PADDLE_ENFORCE_EQ(ids_dims.size(), 2); - PADDLE_ENFORCE_EQ(ids_dims[1], 1); + PADDLE_ENFORCE_EQ(ids_dims[0].size(), 2); + PADDLE_ENFORCE_EQ(ids_dims[0][1], 1); } auto x_var_type = ctx->GetInputsVarType("X"); for (auto &var_type : x_var_type) { diff --git a/paddle/fluid/operators/merge_ids_op.h b/paddle/fluid/operators/merge_ids_op.h index 83712a8519c6817151e1922c606c0fdd4682a2db..fef9e023d02f45e21ec409ad398ba7d9bdd36880 100644 --- a/paddle/fluid/operators/merge_ids_op.h +++ b/paddle/fluid/operators/merge_ids_op.h @@ -14,6 +14,8 @@ limitations under the License. */ #pragma once +#include +#include #include #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/tensor_util.h" @@ -30,59 +32,70 @@ class MergeIdsOpKernel : public framework::OpKernel { if (!platform::is_cpu_place(place)) { PADDLE_THROW("MergeIds do not support GPU kernel"); } - VLOG(3) << "run in MergeIdsOpKernel"; - const auto *ids_var = ctx.InputVar("Ids"); - PADDLE_ENFORCE(ids_var->IsType(), - "only support to merge Ids of LoDTensor"); + const auto ids = ctx.MultiInput("Ids"); + const auto row_ids = ctx.MultiInput("Rows"); + const auto x_tensors = ctx.MultiInput("X"); + auto outs = ctx.MultiOutput("Out"); - const auto &ids_tensor = ids_var->Get(); - const auto &ids_dims = ids_tensor.dims(); - const int64_t *ids = ids_tensor.data(); + PADDLE_ENFORCE_EQ(row_ids.size(), x_tensors.size(), + "the number of Rows and X should be the same"); + PADDLE_ENFORCE_EQ(ids.size(), outs.size(), + "the number of Ids and Out should be the same"); - auto x_tensors = ctx.MultiInput("X"); + int row_ids_size = 0; + int row_size = 0; + int embedding_size = 0; - auto *out = ctx.Output("Out"); + for (int i = 0; i < x_tensors.size(); ++i) { + const auto *x_tensor = x_tensors[i]; + const auto *row_id = row_ids[i]; - int batch_size = 0; - int embedding_size = 0; - for (auto &input : x_tensors) { - if (framework::product(input->dims()) != 0) { - if (embedding_size == 0) { - embedding_size = input->dims()[1]; - } - PADDLE_ENFORCE_EQ(embedding_size, input->dims()[1], - "embedding size of all input should be the same"); - batch_size += input->dims()[0]; + if (embedding_size == 0) { + embedding_size = x_tensor->dims()[1]; } + PADDLE_ENFORCE_EQ(embedding_size, x_tensor->dims()[1], + "embedding size of all input should be the same"); + row_size += x_tensor->dims()[0]; + row_ids_size += row_id->dims()[0]; } + PADDLE_ENFORCE_EQ( - batch_size, ids_dims[0], - "the batch size of ids and merged embedding value should be the same"); + row_size, row_ids_size, + "the merged X dim[0] and merged Rows dim[0] should be the same"); + + std::unordered_map> + selected_rows_idx_map; + for (int i = 0; i < x_tensors.size(); ++i) { + const auto *row_id = row_ids[i]; + + for (int j = 0; j < row_id->numel(); ++j) { + int64_t key = row_id->data()[j]; + std::tuple val = std::make_tuple(i, j); + selected_rows_idx_map.insert(std::make_pair(key, val)); + } + } + PADDLE_ENFORCE_EQ(row_ids_size, selected_rows_idx_map.size(), + "the rows and tensor map size should be the same"); + + for (int i = 0; i < outs.size(); ++i) { + auto *out_ids = ids[i]; + auto *out = outs[i]; - const size_t shard_num = x_tensors.size(); + out->set_lod(out_ids->lod()); - if (shard_num == 1) { - VLOG(3) << "only one shard, we can copy the data directly"; - TensorCopy(*x_tensors[0], place, out); - } else { - std::vector in_indexs(shard_num, 0); + int nums = static_cast(out_ids->dims()[0]); auto *out_data = out->mutable_data( - framework::make_ddim({batch_size, embedding_size}), place); - // copy data from ins[shard_num] to out. - for (int i = 0; i < ids_dims[0]; ++i) { - int64_t id = ids[i]; - size_t shard_id = static_cast(id) % shard_num; - int index = in_indexs[shard_id]; - memcpy(out_data + embedding_size * i, - x_tensors[shard_id]->data() + index * embedding_size, + framework::make_ddim({nums, embedding_size}), place); + for (int j = 0; j < nums; ++j) { + int id = out_ids->data()[j]; + auto row_tuple = selected_rows_idx_map[id]; + int64_t row_idx = std::get<1>(row_tuple); + const auto *x_tensor = x_tensors[std::get<0>(row_tuple)]; + + memcpy(out_data + embedding_size * j, + x_tensor->data() + row_idx * embedding_size, sizeof(T) * embedding_size); - in_indexs[shard_id] += 1; - } - - for (size_t i = 0; i < shard_num; ++i) { - PADDLE_ENFORCE_EQ(in_indexs[i], x_tensors[i]->dims()[0], - "after merge, all data in x_tensor should be used"); } } } diff --git a/paddle/fluid/operators/split_ids_op.cc b/paddle/fluid/operators/split_ids_op.cc index c867c46873ae7ddbdbda280351e4ab28235bcc08..243f81e296fb95a2c7e9f717950b8a958ad98852 100644 --- a/paddle/fluid/operators/split_ids_op.cc +++ b/paddle/fluid/operators/split_ids_op.cc @@ -20,20 +20,27 @@ namespace operators { class SplitIdsOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { - AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}"); - AddOutput("Out", "(LoDTensor) The outputs of the input Ids.") + AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}") + .AsDuplicable(); + + AddOutput("Out", "(LoDTensors) The outputs of the input Ids.") .AsDuplicable(); AddComment(R"DOC( Split a LoDTensor of Ids into multi LoDTensors, the number is pserver's number Example: Input: - X = [1,2,3,4,5,6] + X = [[1,2,3,4,5,6],[2,3]] Out(3 output): - out0 = [3, 6] - out1 = [1, 4] - out2 = [2, 5] + if compress is True: + out0 = [3, 3, 6] + out1 = [1, 4] + out2 = [2, 2, 5] + else: + out0 = [3, 6] + out1 = [1, 4] + out2 = [2, 5] )DOC"); } }; @@ -43,16 +50,24 @@ class SplitIdsOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("Ids"), "SplitIdsOp must has input Ids."); + PADDLE_ENFORCE(ctx->HasInputs("Ids"), "SplitIdsOp must has input Ids."); PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must has output Out."); auto ids_var_type = ctx->GetInputsVarType("Ids").front(); - auto ids_dims = ctx->GetInputDim("Ids"); + auto ids_dims = ctx->GetInputsDim("Ids"); if (ids_var_type == framework::proto::VarType::LOD_TENSOR) { - PADDLE_ENFORCE_EQ(ids_dims.size(), 2); - PADDLE_ENFORCE_EQ(ids_dims[1], 1); + PADDLE_ENFORCE_EQ(ids_dims[0].size(), 2); } } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext &ctx) const override { + return framework::OpKernelType( + framework::ToDataType( + ctx.MultiInput("Ids").front()->type()), + ctx.GetPlace()); + } }; class SplitIdsOpInferVarType : public framework::VarTypeInference { @@ -66,12 +81,28 @@ class SplitIdsOpInferVarType : public framework::VarTypeInference { } }; +class SplitIdsOpGradMaker : public framework::SingleGradOpDescMaker { + public: + using framework::SingleGradOpDescMaker::SingleGradOpDescMaker; + + protected: + std::unique_ptr Apply() const override { + auto grad = new framework::OpDesc(); + grad->SetType("concat"); + grad->SetInput("X", OutputGrad("Out")); + grad->SetOutput("Out", InputGrad("Ids")); + grad->SetAttr("axis", 0); + return std::unique_ptr(grad); + } +}; + } // namespace operators } // namespace paddle namespace ops = paddle::operators; REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker, - ops::SplitIdsOpInferVarType); + ops::SplitIdsOpGradMaker, ops::SplitIdsOpInferVarType); + REGISTER_OP_CPU_KERNEL( split_ids, ops::SplitIdsOpKernel, ops::SplitIdsOpKernel); diff --git a/paddle/fluid/operators/split_ids_op.h b/paddle/fluid/operators/split_ids_op.h index c4af5a65fc5f81c1af7c1fdcca637ca37c940637..69ac6c5a6b9a8b318520eb9a3ff89a3a6be48339 100644 --- a/paddle/fluid/operators/split_ids_op.h +++ b/paddle/fluid/operators/split_ids_op.h @@ -14,6 +14,8 @@ limitations under the License. */ #pragma once +#include +#include #include #include #include "paddle/fluid/framework/op_registry.h" @@ -31,19 +33,39 @@ class SplitIdsOpKernel : public framework::OpKernel { PADDLE_THROW("SplitIds do not support GPU kernel"); } - const auto *ids_var = ctx.InputVar("Ids"); + const auto ids_vars = ctx.MultiInputVar("Ids"); + + PADDLE_ENFORCE_GT(ids_vars.size(), 0, "The number of Ids should > 0"); + auto *ids_var = ids_vars[0]; + if (ids_var->IsType()) { - const auto &ids_dims = ctx.Input("Ids")->dims(); - const T *ids = ctx.Input("Ids")->data(); + int batch_size = 0; + const auto ids_tensors = ctx.MultiInput("Ids"); + for (size_t i = 0; i < ids_tensors.size(); ++i) { + batch_size += ids_tensors[i]->dims()[0]; + } + VLOG(4) << "Get Total BatchSize is: " << batch_size; + + std::vector all_ids(batch_size); + int offset = 0; + for (size_t i = 0; i < ids_tensors.size(); ++i) { + const auto *ids = ids_tensors[i]; + std::memcpy(all_ids.data() + offset, ids->data(), + ids->numel() * sizeof(T)); + offset += ids->numel(); + } + + std::set st(all_ids.begin(), all_ids.end()); + all_ids.assign(st.begin(), st.end()); + auto outs = ctx.MultiOutput("Out"); const size_t shard_num = outs.size(); - std::vector> out_ids; out_ids.resize(outs.size()); // split id by their shard_num. - for (int i = 0; i < ids_dims[0]; ++i) { - T id = ids[i]; + for (int i = 0; i < all_ids.size(); ++i) { + T id = all_ids[i]; size_t shard_id = static_cast(id) % shard_num; out_ids[shard_id].push_back(id); } @@ -64,7 +86,7 @@ class SplitIdsOpKernel : public framework::OpKernel { PADDLE_ENFORCE_EQ(ids_dims[0], static_cast(ids_selected_rows->rows().size()), ""); - const T *ids = ids_selected_rows->value().data(); + const T *ids_data = ids_selected_rows->value().data(); const auto &ids_rows = ids_selected_rows->rows(); auto outs = ctx.MultiOutput("Out"); const size_t shard_num = outs.size(); @@ -87,7 +109,7 @@ class SplitIdsOpKernel : public framework::OpKernel { T *output = out->mutable_value()->mutable_data(ddim, place); for (int64_t i = 0; i < ddim[0]; ++i) { memcpy(output + i * row_width, - ids + id_to_index[out->rows()[i]] * row_width, + ids_data + id_to_index[out->rows()[i]] * row_width, row_width * sizeof(T)); } } diff --git a/paddle/fluid/operators/split_selected_rows_op.cc b/paddle/fluid/operators/split_selected_rows_op.cc index 76615a9405d7a8e3fa9dba8d01a956209e02ae8f..0e7b1463d1ba81aed53e0e3f3a90d2a1fbf0ffbc 100644 --- a/paddle/fluid/operators/split_selected_rows_op.cc +++ b/paddle/fluid/operators/split_selected_rows_op.cc @@ -22,9 +22,9 @@ class SplitSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker { void Make() override { AddInput("X", "The input SelectedRows."); AddOutput("Out", "The outputs of the input SelectedRows.").AsDuplicable(); - AddAttr>("height_sections", - "Height for each output SelectedRows.") - .SetDefault(std::vector({})); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); AddComment(R"DOC( Split a SelectedRows with a specified rows section. diff --git a/paddle/fluid/operators/split_selected_rows_op.h b/paddle/fluid/operators/split_selected_rows_op.h index 0e9ce165b98845f4745ee70b028513ea31cc6657..af64607fafc6544047714e731846a2440be219b8 100644 --- a/paddle/fluid/operators/split_selected_rows_op.h +++ b/paddle/fluid/operators/split_selected_rows_op.h @@ -21,7 +21,7 @@ limitations under the License. */ namespace paddle { namespace operators { -static int FindOutIdx(int row, const std::vector& abs_sections) { +static int FindOutIdx(int row, const std::vector& abs_sections) { for (size_t i = 1; i < abs_sections.size(); ++i) { if (row < abs_sections[i]) { return i - 1; @@ -30,9 +30,9 @@ static int FindOutIdx(int row, const std::vector& abs_sections) { return abs_sections.size() - 1; } -static std::vector ToAbsoluteSection( - const std::vector& height_sections) { - std::vector abs_sections; +static std::vector ToAbsoluteSection( + const std::vector& height_sections) { + std::vector abs_sections; abs_sections.resize(height_sections.size()); abs_sections[0] = 0; for (size_t i = 1; i < height_sections.size(); ++i) { @@ -47,7 +47,7 @@ class SplitSelectedRowsOpKernel : public framework::OpKernel { void Compute(const framework::ExecutionContext& ctx) const override { auto* x = ctx.Input("X"); auto outs = ctx.MultiOutput("Out"); - auto height_sections = ctx.Attr>("height_sections"); + auto height_sections = ctx.Attr>("height_sections"); auto abs_sections = ToAbsoluteSection(height_sections); diff --git a/paddle/fluid/operators/sum_op.h b/paddle/fluid/operators/sum_op.h index 11987c61aebaad00f8a71f1b909c83c44ddc8b0e..f6e12dfc76c6ce73f10e707387f6a9cedacde3c8 100644 --- a/paddle/fluid/operators/sum_op.h +++ b/paddle/fluid/operators/sum_op.h @@ -83,79 +83,54 @@ class SumKernel : public framework::OpKernel { } } } else if (out_var->IsType()) { - std::unique_ptr in0; - if (in_place) { - // If is in_place, we store the input[0] to in0 - auto &in_sel0 = in_vars[0]->Get(); - auto &rows = in_sel0.rows(); -#ifdef PADDLE_WITH_CUDA - std::vector rows_in_cpu; - rows_in_cpu.reserve(rows.size()); - for (auto item : rows) { - rows_in_cpu.push_back(item); - } - in0.reset(new framework::SelectedRows(rows_in_cpu, in_sel0.height())); -#else - in0.reset(new framework::SelectedRows(rows, in_sel0.height())); -#endif - in0->mutable_value()->ShareDataWith(in_sel0.value()); + if (in_place && in_vars.size() < 2) { + return; } - auto get_selected_row = [&](size_t i) -> const SelectedRows & { - if (i == 0 && in0) { - return *in0.get(); - } else { - return in_vars[i]->Get(); + std::vector inputs; + SelectedRows temp_in0; + + if (in_place) { + auto &in0 = in_vars[0]->Get(); + temp_in0.set_height(in0.height()); + temp_in0.set_rows(in0.rows()); + framework::TensorCopy(in0.value(), in0.place(), + context.device_context(), + temp_in0.mutable_value()); + inputs.push_back(&temp_in0); + for (size_t i = 1; i < in_vars.size(); ++i) { + auto &in = in_vars[i]->Get(); + if (in.rows().size() > 0) { + inputs.push_back(&in); + } + } + } else { + for (auto &in_var : in_vars) { + auto &in = in_var->Get(); + if (in.rows().size() > 0) { + inputs.push_back(&in_var->Get()); + } } - }; + } auto *out = context.Output("Out"); out->mutable_rows()->clear(); - auto *out_value = out->mutable_value(); - - // Runtime InferShape - size_t first_dim = 0; - for (size_t i = 0; i < in_num; i++) { - auto &sel_row = get_selected_row(i); - first_dim += sel_row.rows().size(); - } - std::vector in_dim; - for (size_t i = 0; i < in_num; i++) { - auto &sel_row = get_selected_row(i); - if (sel_row.rows().size() > 0) { - in_dim = framework::vectorize(sel_row.value().dims()); + bool has_data = false; + for (auto &in : inputs) { + if (in->rows().size() > 0) { + has_data = true; break; } } - if (in_dim.empty()) { - VLOG(3) << "WARNING: all the inputs are empty"; - in_dim = - framework::vectorize(get_selected_row(in_num - 1).value().dims()); + if (has_data) { + math::scatter::MergeAdd merge_add; + merge_add(context.template device_context(), inputs, + out); } else { - in_dim[0] = static_cast(first_dim); - } - - out_value->Resize(framework::make_ddim(in_dim)); - out_value->mutable_data(context.GetPlace()); - // if all the input sparse vars are empty, no need to - // merge these vars. - if (first_dim == 0UL) { - return; - } - - math::SelectedRowsAddTo functor; - - int64_t offset = 0; - for (size_t i = 0; i < in_num; i++) { - auto &sel_row = get_selected_row(i); - if (sel_row.rows().size() == 0) { - continue; - } - PADDLE_ENFORCE_EQ(out->height(), sel_row.height()); - functor(context.template device_context(), sel_row, - offset, out); - offset += sel_row.value().numel(); + // no data, just set a empty out tensor. + out->mutable_value()->mutable_data(framework::make_ddim({0}), + context.GetPlace()); } } else if (out_var->IsType()) { auto &out_array = *out_var->GetMutable(); diff --git a/paddle/fluid/operators/uniform_random_op.cc b/paddle/fluid/operators/uniform_random_op.cc index aa907595cb7cf165974caa69fe8eb0370471732d..e3132ae76f624f3338d749e4fcebbd0ecd7ffe79 100644 --- a/paddle/fluid/operators/uniform_random_op.cc +++ b/paddle/fluid/operators/uniform_random_op.cc @@ -29,7 +29,7 @@ class CPUUniformRandomKernel : public framework::OpKernel { if (out_var->IsType()) { tensor = out_var->GetMutable(); } else if (out_var->IsType()) { - auto shape = ctx.Attr>("shape"); + auto shape = ctx.Attr>("shape"); auto *selected_rows = out_var->GetMutable(); tensor = selected_rows->mutable_value(); tensor->Resize(framework::make_ddim(shape)); @@ -67,7 +67,7 @@ class UniformRandomOp : public framework::OperatorWithKernel { PADDLE_ENFORCE( ctx->Attrs().Get("min") < ctx->Attrs().Get("max"), "uniform_random's min must less then max"); - auto &shape = ctx->Attrs().Get>("shape"); + auto &shape = ctx->Attrs().Get>("shape"); std::vector temp; temp.reserve(shape.size()); for (auto dim : shape) { @@ -94,7 +94,7 @@ This operator initializes a tensor with random values sampled from a uniform distribution. The random result is in set [min, max]. )DOC"); - AddAttr>("shape", "The shape of the output tensor"); + AddAttr>("shape", "The shape of the output tensor"); AddAttr("min", "Minimum value of uniform random. [default -1.0].") .SetDefault(-1.0f); AddAttr("max", "Maximun value of uniform random. [default 1.0].") diff --git a/paddle/fluid/operators/uniform_random_op.cu b/paddle/fluid/operators/uniform_random_op.cu index bbb692b0ddfc18e8a62c0d2a6bac88f9932f6704..2bb0ecc139f7096d1b61150e0a2d4fb095338749 100644 --- a/paddle/fluid/operators/uniform_random_op.cu +++ b/paddle/fluid/operators/uniform_random_op.cu @@ -48,7 +48,7 @@ class GPUUniformRandomKernel : public framework::OpKernel { if (out_var->IsType()) { tensor = out_var->GetMutable(); } else if (out_var->IsType()) { - auto shape = context.Attr>("shape"); + auto shape = context.Attr>("shape"); tensor = out_var->GetMutable()->mutable_value(); tensor->Resize(framework::make_ddim(shape)); } else { diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 3b22718a8c6f994dbc2dc3e7aaa19a7163f716ba..d3b0d4a22954c1d67dc9551b997dcffa0625cbeb 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -57,6 +57,18 @@ struct variant_caster> { auto caster = make_caster(); if (!load_success_ && caster.load(src, convert)) { load_success_ = true; + + if (std::is_same>::value) { + auto caster_ints = make_caster>(); + if (caster_ints.load(src, convert)) { + VLOG(4) << "This value are floats and int64_ts satisfy " + "simultaneously, will set it's type to " + "std::vector"; + value = cast_op>(caster_ints); + return true; + } + } + value = cast_op(caster); return true; } @@ -259,6 +271,8 @@ void BindOpDesc(pybind11::module *m) { pybind11::enum_(*m, "AttrType", "") .value("INT", pd::proto::AttrType::INT) .value("INTS", pd::proto::AttrType::INTS) + .value("LONG", pd::proto::AttrType::LONG) + .value("LONGS", pd::proto::AttrType::LONGS) .value("FLOAT", pd::proto::AttrType::FLOAT) .value("FLOATS", pd::proto::AttrType::FLOATS) .value("STRING", pd::proto::AttrType::STRING) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index bcd4e4f6073eff1ea0449da8096030743158dd0f..737c8be8147a7efaf9b89827f063430146d3c078 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -121,6 +121,9 @@ def __bootstrap__(): read_env_flags.append('rpc_server_profile_period') read_env_flags.append('rpc_server_profile_path') read_env_flags.append('enable_rpc_profiler') + read_env_flags.append('rpc_send_thread_num') + read_env_flags.append('rpc_get_thread_num') + read_env_flags.append('rpc_prefetch_thread_num') if core.is_compiled_with_cuda(): read_env_flags += [ diff --git a/python/paddle/fluid/op.py b/python/paddle/fluid/op.py index 667db10d3ebdd24ddd9efbe2310ebb331e268ee2..4e1d1450dea85fe4eb3e68713250836e4beac992 100644 --- a/python/paddle/fluid/op.py +++ b/python/paddle/fluid/op.py @@ -120,6 +120,8 @@ class OpDescCreationMethod(object): new_attr.strings.extend(user_defined_attr) elif attr.type == framework_pb2.BOOLEANS: new_attr.bools.extend(user_defined_attr) + elif attr.type == framework_pb2.LONGS: + new_attr.longs.extend(user_defined_attr) elif attr.type == framework_pb2.INT_PAIRS: for p in user_defined_attr: pair = new_attr.int_pairs.add() diff --git a/python/paddle/fluid/tests/unittests/test_dist_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_ctr.py index 390393e04f8a1ff7b994da66cf1fa104ccb61793..b2d979729bc9b2546375cb657f78abe0d8c2dcc7 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_ctr.py @@ -18,6 +18,7 @@ import unittest from test_dist_base import TestDistBase +# FIXME(tangwei): sum op can not handle when inputs is empty. class TestDistCTR2x2(TestDistBase): def _setup_config(self): self._sync_mode = True diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py index fcf793da07eb8985fb19c9e36ff9b7d5a8f51212..102a4dab05fe1adc6a503920714f50415b29dc19 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py @@ -42,7 +42,6 @@ class TestDistSimnetBow2x2DenseAsync(TestDistBase): self._sync_mode = False self._enforce_place = "CPU" - #FIXME(typhoonzero): fix async tests later def no_test_simnet_bow(self): need_envs = { "IS_DISTRIBUTED": '0', @@ -93,7 +92,6 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): # FIXME(tangwei): Learningrate variable is not created on pserver. -""" class TestDistSimnetBow2x2LookupTableSync(TestDistBase): def _setup_config(self): self._sync_mode = True @@ -146,7 +144,7 @@ class TestDistSimnetBow2x2LookupTableNotContainLRSync(TestDistBase): delta=1e-5, check_error_log=False, need_envs=need_envs) -""" + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index 54a1c68a37f6929890aab697b48d621e6effb7d8..c4511a98b0667ecccaa8f63b3064c4fc4e86cc78 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -480,7 +480,7 @@ class TestDistLookupTable(TestDistLookupTableBase): def transpiler_test_impl(self): pserver1, startup1 = self.get_pserver(self.pserver1_ep) - self.assertEqual(len(pserver1.blocks), 6) + self.assertEqual(len(pserver1.blocks), 5) # 0 listen_and_serv # 1 optimize for fc_w or fc_b adam self.assertEqual([op.type for op in pserver1.blocks[1].ops], @@ -491,26 +491,32 @@ class TestDistLookupTable(TestDistLookupTableBase): # 3 prefetch -> lookup_sparse_table for data0 self.assertEqual([op.type for op in pserver1.blocks[3].ops], ["lookup_sparse_table"]) - # 4 prefetch -> lookup_sparse_table for data1 - self.assertEqual([op.type for op in pserver1.blocks[4].ops], - ["lookup_sparse_table"]) - # 5 save table - self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"]) + # 4 save table + self.assertEqual([op.type for op in pserver1.blocks[4].ops], ["save"]) - trainer, _ = self.get_trainer() + trainer, trainer_startup = self.get_trainer() self.assertEqual(len(trainer.blocks), 1) ops = [ - 'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', 'split_ids', - 'prefetch', 'merge_ids', 'sequence_pool', 'concat', 'mul', - 'elementwise_add', 'cross_entropy', 'mean', 'fill_constant', - 'mean_grad', 'cross_entropy_grad', 'elementwise_add_grad', 'send', - 'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad', - 'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad', - 'sum', 'split_ids', 'send', 'send_barrier', 'recv', 'recv', - 'fetch_barrier' + 'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', + 'sequence_pool', 'concat', 'mul', 'elementwise_add', + 'cross_entropy', 'mean', 'fill_constant', 'mean_grad', + 'cross_entropy_grad', 'elementwise_add_grad', 'send', 'mul_grad', + 'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad', + 'sequence_pool_grad', 'lookup_table_grad', 'sum', 'split_ids', + 'send', 'send_barrier', 'recv', 'recv', 'fetch_barrier' ] self.assertEqual([op.type for op in trainer.blocks[0].ops], ops) + startup_ops = [ + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'fill_constant', 'fill_constant', + 'fill_constant', 'fill_constant', 'uniform_random', 'recv', 'recv', + 'fetch_barrier', 'fake_init' + ] + self.assertEqual([op.type for op in trainer_startup.blocks[0].ops], + startup_ops) + class TestAsyncLocalLookupTable(TestDistLookupTableBase): def net_conf(self): @@ -553,7 +559,7 @@ class TestAsyncDistLookupTable(TestDistLookupTableBase): pserver1, startup1 = self.get_pserver(self.pserver1_ep, config, False) - self.assertEqual(len(pserver1.blocks), 6) + self.assertEqual(len(pserver1.blocks), 5) # 0 listen_and_serv # 1 optimize for fc_w or fc_b adam self.assertEqual([op.type for op in pserver1.blocks[1].ops], @@ -563,22 +569,19 @@ class TestAsyncDistLookupTable(TestDistLookupTableBase): # 3 prefetch -> lookup_sparse_table for data0 self.assertEqual([op.type for op in pserver1.blocks[3].ops], ["lookup_sparse_table"]) - # 4 prefetch -> lookup_sparse_table for data1 - self.assertEqual([op.type for op in pserver1.blocks[4].ops], - ["lookup_sparse_table"]) - # 5 save table - self.assertEqual([op.type for op in pserver1.blocks[5].ops], ["save"]) + # 4 save table + self.assertEqual([op.type for op in pserver1.blocks[4].ops], ["save"]) trainer, _ = self.get_trainer(config) self.assertEqual(len(trainer.blocks), 1) ops = [ - 'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', 'split_ids', - 'prefetch', 'merge_ids', 'sequence_pool', 'concat', 'mul', - 'elementwise_add', 'cross_entropy', 'mean', 'fill_constant', - 'mean_grad', 'cross_entropy_grad', 'elementwise_add_grad', 'send', - 'mul_grad', 'send', 'concat_grad', 'sequence_pool_grad', - 'lookup_table_grad', 'sequence_pool_grad', 'lookup_table_grad', - 'sum', 'split_ids', 'send', 'recv', 'recv' + 'split_ids', 'prefetch', 'merge_ids', 'sequence_pool', + 'sequence_pool', 'concat', 'mul', 'elementwise_add', + 'cross_entropy', 'mean', 'fill_constant', 'mean_grad', + 'cross_entropy_grad', 'elementwise_add_grad', 'send', 'mul_grad', + 'send', 'concat_grad', 'sequence_pool_grad', 'lookup_table_grad', + 'sequence_pool_grad', 'lookup_table_grad', 'sum', 'split_ids', + 'send', 'recv', 'recv' ] self.assertEqual([op.type for op in trainer.blocks[0].ops], ops) diff --git a/python/paddle/fluid/tests/unittests/test_fake_init_op.py b/python/paddle/fluid/tests/unittests/test_fake_init_op.py new file mode 100644 index 0000000000000000000000000000000000000000..a62b7aed66b59940b4ba654d98479e3e35c7b78b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fake_init_op.py @@ -0,0 +1,52 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest + +import paddle.fluid.core as core +from paddle.fluid.op import Operator + + +class TestFakeInitOpSelectedRows(unittest.TestCase): + def check_with_place(self, place, is_selected_rows): + scope = core.Scope() + + out_var_name = 'Out' + if is_selected_rows: + out_tensor = scope.var(out_var_name).get_selected_rows().get_tensor( + ) + else: + out_tensor = scope.var(out_var_name).get_tensor() + + var_shape = [4, 784] + + # create and run fake_init_op + fake_init_op = Operator("fake_init", Out=out_var_name, shape=var_shape) + fake_init_op.run(scope, place) + + self.assertEqual(var_shape, out_tensor._get_dims()) + + def test_fake_init_selected_rows(self): + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + for place in places: + for is_selected_rows in [True, False]: + self.check_with_place(place, is_selected_rows) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_merge_ids_op.py b/python/paddle/fluid/tests/unittests/test_merge_ids_op.py index 26ce7024117162e8bad403a9d8b8518c27578c83..b109e4ea62669c735128f4824eb9d02ad43900e0 100644 --- a/python/paddle/fluid/tests/unittests/test_merge_ids_op.py +++ b/python/paddle/fluid/tests/unittests/test_merge_ids_op.py @@ -22,15 +22,28 @@ from op_test import OpTest class TestMergeIdsOp(OpTest): def setUp(self): self.op_type = "merge_ids" - ids = np.array([[0], [2], [2], [3], [5], [5], [6]]).astype('int64') - x0 = np.array([[0.1, 0.2], [0.2, 0.3], [0.3, 0.4]]).astype('float32') - x1 = np.array([]).astype('float32') - x2 = np.array([[0.4, 0.5], [0.4, 0.5], [0.5, 0.6], - [0.5, 0.6]]).astype('float32') - out = np.array([[0.1, 0.2], [0.4, 0.5], [0.4, 0.5], [0.2, 0.3], - [0.5, 0.6], [0.5, 0.6], [0.3, 0.4]]).astype('float32') - self.inputs = {'Ids': ids, "X": [('x0', x0), ('x1', x1), ('x2', x2)]} - self.outputs = {'Out': out} + ids1 = np.array([[0], [2], [5], [6]]).astype('int64') + ids2 = np.array([[0], [2], [2], [3]]).astype('int64') + + rows1 = np.array([[0], [2]]).astype('int64') + rows2 = np.array([[3], [5]]).astype('int64') + rows3 = np.array([[6]]).astype('int64') + + x0 = np.array([[0.1, 0.2], [0.2, 0.3]]).astype('float32') + x1 = np.array([[0.3, 0.4], [0.4, 0.5]]).astype('float32') + x2 = np.array([[0.5, 0.6]]).astype('float32') + + out1 = np.array( + [[0.1, 0.2], [0.2, 0.3], [0.4, 0.5], [0.5, 0.6]]).astype('float32') + out2 = np.array( + [[0.1, 0.2], [0.2, 0.3], [0.2, 0.3], [0.3, 0.4]]).astype('float32') + + self.inputs = { + 'Ids': [('ids1', ids1), ('ids2', ids2)], + "Rows": [('rows1', rows1), ('rows2', rows2), ('rows3', rows3)], + "X": [('x0', x0), ('x1', x1), ('x2', x2)] + } + self.outputs = {'Out': [('out1', out1), ('out2', out2)]} def test_check_output(self): self.check_output() diff --git a/python/paddle/fluid/tests/unittests/test_split_ids_op.py b/python/paddle/fluid/tests/unittests/test_split_ids_op.py index 4c3d0258980fd8595704a65219deb520b96e222e..d674dad2293921c06135b4ee528538d266cb2904 100644 --- a/python/paddle/fluid/tests/unittests/test_split_ids_op.py +++ b/python/paddle/fluid/tests/unittests/test_split_ids_op.py @@ -25,18 +25,21 @@ from paddle.fluid.op import Operator class TestSplitIdsOp(OpTest): def setUp(self): self.op_type = "split_ids" - ids = np.array([[0], [2], [2], [3], [5], [5], [6]]).astype('int64') + ids1 = np.array([[0], [2], [2], [3], [5], [5], [6]]).astype('int64') + ids2 = np.array([[6], [2], [3], [3], [5], [2], [6]]).astype('int64') + ids3 = np.array([[2], [2], [2], [3], [5], [5], [6]]).astype('int64') + out0 = np.array([[0], [3], [6]]).astype('int64') out1 = np.array([[]]).astype('int64') - out2 = np.array([[2], [2], [5], [5]]).astype('int64') - self.inputs = {'Ids': ids} + out2 = np.array([[2], [5]]).astype('int64') + self.inputs = {'Ids': [('ids1', ids1), ('ids2', ids2), ('ids3', ids3)]} self.outputs = {'Out': [('out0', out0), ('out1', out1), ('out2', out2)]} def test_check_output(self): self.check_output() -class TestSpliteIds(unittest.TestCase): +class TestSplitSelectedRows(unittest.TestCase): def get_places(self): places = [core.CPUPlace()] return places diff --git a/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py b/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py index 41a5ee59ea523b1f6c5015974a12c526e883fa35..50204b8a77c187aa695da83860960566448d290f 100644 --- a/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py +++ b/python/paddle/fluid/tests/unittests/test_split_selected_rows_op.py @@ -99,7 +99,6 @@ class TestSpliteSelectedRows(unittest.TestCase): out0_grad.set_height(height) out0_grad_tensor = out0_grad.get_tensor() np_array = np.ones((len(rows0), row_numel)).astype("float32") - np_array[0, 0] = 2.0 out0_grad_tensor.set(np_array, place) out1_grad = scope.var("out1@GRAD").get_selected_rows() @@ -108,7 +107,6 @@ class TestSpliteSelectedRows(unittest.TestCase): out1_grad.set_height(height) out1_grad_tensor = out1_grad.get_tensor() np_array = np.ones((len(rows1), row_numel)).astype("float32") - np_array[0, 1] = 4.0 out1_grad_tensor.set(np_array, place) x_grad = scope.var("X@GRAD").get_selected_rows() @@ -121,11 +119,13 @@ class TestSpliteSelectedRows(unittest.TestCase): grad_op.run(scope, place) - self.assertEqual(x_grad.rows(), rows0 + rows1) + merged_rows = set(rows0 + rows1) + self.assertEqual(set(x_grad.rows()), set(rows0 + rows1)) self.assertEqual(x_grad.height(), height) + print(np.array(x_grad.get_tensor())) self.assertAlmostEqual(2.0, np.array(x_grad.get_tensor())[0, 0]) - self.assertAlmostEqual(4.0, np.array(x_grad.get_tensor())[2, 1]) + self.assertAlmostEqual(1.0, np.array(x_grad.get_tensor())[2, 1]) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_sum_op.py b/python/paddle/fluid/tests/unittests/test_sum_op.py index 74797bb65678404b7b35d06eecc7f9a12b2a346e..e20418ff1c8d21f3a3e4ba15ff2aa9d54f37f4b2 100644 --- a/python/paddle/fluid/tests/unittests/test_sum_op.py +++ b/python/paddle/fluid/tests/unittests/test_sum_op.py @@ -45,16 +45,30 @@ class TestSumOp(OpTest): class TestSelectedRowsSumOp(OpTest): - def check_with_place(self, place): - scope = core.Scope() - self.check_input_and_optput(scope, place, True, True, True) - self.check_input_and_optput(scope, place, False, True, True) - self.check_input_and_optput(scope, place, False, False, True) - self.check_input_and_optput(scope, place, False, False, False) + def check_with_place(self, place, inplace): + self.height = 10 + self.row_numel = 12 + self.rows = [0, 1, 2, 3, 4, 5, 6] + + self.check_input_and_optput(core.Scope(), place, inplace, True, True, + True) + self.check_input_and_optput(core.Scope(), place, inplace, False, True, + True) + self.check_input_and_optput(core.Scope(), place, inplace, False, False, + True) + self.check_input_and_optput(core.Scope(), place, inplace, False, False, + False) + + def _get_array(self, row_num, row_numel): + array = np.ones((row_num, row_numel)).astype("float32") + for i in range(row_num): + array[i] *= i + return array def check_input_and_optput(self, scope, place, + inplace, w1_has_data=False, w2_has_data=False, w3_has_data=False): @@ -64,35 +78,43 @@ class TestSelectedRowsSumOp(OpTest): self.create_selected_rows(scope, place, "W3", w3_has_data) # create Out Variable - out = scope.var('Out').get_selected_rows() + if inplace: + out_var_name = "W1" + else: + out_var_name = "Out" + out = scope.var(out_var_name).get_selected_rows() # create and run sum operator - sum_op = Operator("sum", X=["W1", "W2", "W3"], Out='Out') + sum_op = Operator("sum", X=["W1", "W2", "W3"], Out=out_var_name) sum_op.run(scope, place) has_data_w_num = 0 - for w in [w1_has_data, w2_has_data, w3_has_data]: - if not w: + for has_data in [w1_has_data, w2_has_data, w3_has_data]: + if has_data: has_data_w_num += 1 - self.assertEqual(7 * has_data_w_num, len(out.rows())) + if has_data_w_num > 0: + self.assertEqual(len(out.rows()), 7) + self.assertTrue( + np.array_equal( + np.array(out.get_tensor()), + self._get_array(len(self.rows), self.row_numel) * + has_data_w_num)) + else: + self.assertEqual(len(out.rows()), 0) - def create_selected_rows(self, scope, place, var_name, isEmpty): + def create_selected_rows(self, scope, place, var_name, has_data): # create and initialize W Variable - if not isEmpty: - rows = [0, 1, 2, 3, 4, 5, 6] - row_numel = 12 + if has_data: + rows = self.rows else: rows = [] - row_numel = 12 var = scope.var(var_name) w_selected_rows = var.get_selected_rows() - w_selected_rows.set_height(len(rows)) + w_selected_rows.set_height(self.height) w_selected_rows.set_rows(rows) - w_array = np.ones((len(rows), row_numel)).astype("float32") - for i in range(len(rows)): - w_array[i] *= i + w_array = self._get_array(len(rows), self.row_numel) w_tensor = w_selected_rows.get_tensor() w_tensor.set(w_array, place) @@ -100,9 +122,11 @@ class TestSelectedRowsSumOp(OpTest): def test_w_is_selected_rows(self): places = [core.CPUPlace()] - # currently only support CPU + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) for place in places: - self.check_with_place(place) + for inplace in [True, False]: + self.check_with_place(place, inplace) if __name__ == "__main__": diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 28d7df8e45d4f3209fb6205b76f8d1be8e73392b..70a305219706af3c5759bca2d83820f0cbcb4ada 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -475,6 +475,26 @@ class DistributeTranspiler(object): delete_ops(self.origin_program.global_block(), self.optimize_ops) delete_ops(self.origin_program.global_block(), lr_ops) + # delete table init op + if self.has_distributed_lookup_table: + table_var = self.startup_program.global_block().vars[ + self.table_name] + table_param_init_op = [] + for op in self.startup_program.global_block().ops: + if self.table_name in op.output_arg_names: + table_param_init_op.append(op) + init_op_num = len(table_param_init_op) + if init_op_num != 1: + raise ValueError("table init op num should be 1, now is " + str( + init_op_num)) + table_init_op = table_param_init_op[0] + self.startup_program.global_block().append_op( + type="fake_init", + inputs={}, + outputs={"Out": table_var}, + attrs={"shape": table_init_op.attr('shape')}) + delete_ops(self.startup_program.global_block(), table_param_init_op) + self.origin_program.__str__() if wait_port: @@ -713,7 +733,7 @@ in a single call.") for _, op in enumerate(self.optimize_ops): # optimizer is connected to itself if op.attr(OP_ROLE_VAR_ATTR_NAME)[0] == optimize_target_param_name and \ - op not in global_ops: + op not in global_ops: log("append opt op: ", op.type, op.input_arg_names, merged_var) __append_optimize_op__(op, per_opt_block, @@ -1034,15 +1054,11 @@ to transpile() call.") def _replace_lookup_table_op_with_prefetch(self, program, pserver_endpoints): # 1. replace lookup_table_op with split_ids_op -> prefetch_op -> sum_op - # self.all_prefetch_input_vars = - # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1] - # [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]] + self.all_in_ids_vars = [] self.all_prefetch_input_vars = [] - - # self.all_prefetch_input_vars = - # [[var0_prefetch_in_pserver0, var0_prefetch_in_pserver1] - # [var1_prefetch_in_pserver0, var1_prefetch_in_pserver1]] self.all_prefetch_output_vars = [] + self.all_out_emb_vars = [] + lookup_table_op_index = -1 continue_search_lookup_table_op = True while continue_search_lookup_table_op: @@ -1052,72 +1068,68 @@ to transpile() call.") if op.type == LOOKUP_TABLE_TYPE: continue_search_lookup_table_op = True - lookup_table_op_index = list(all_ops).index(op) + lookup_table_op_index = lookup_table_op_index if lookup_table_op_index != -1 else list( + all_ops).index(op) ids_name = op.input("Ids") out_name = op.output("Out") ids_var = program.global_block().vars[ids_name[0]] - prefetch_input_vars = self._create_splited_vars( - source_var=ids_var, - block=program.global_block(), - tag="_prefetch_in_") - self.all_prefetch_input_vars.append(prefetch_input_vars) + self.all_in_ids_vars.append(ids_var) out_var = program.global_block().vars[out_name[0]] - prefetch_output_vars = self._create_splited_vars( - source_var=out_var, - block=program.global_block(), - tag="_prefetch_out_") - self.all_prefetch_output_vars.append(prefetch_output_vars) - - # insert split_ids_op - program.global_block()._insert_op( - index=lookup_table_op_index, - type="split_ids", - inputs={ - 'Ids': [ - program.global_block().vars[varname] - for varname in ids_name - ] - }, - outputs={"Out": prefetch_input_vars}) - - # insert prefetch_op - program.global_block()._insert_op( - index=lookup_table_op_index + 1, - type="prefetch", - inputs={'X': prefetch_input_vars}, - outputs={"Out": prefetch_output_vars}, - attrs={ - "epmap": pserver_endpoints, - # FIXME(qiao) temporarily disable this config because prefetch - # is not act as other rpc op, it's more like a forward op - # RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE - }) - - # insert concat_op - program.global_block()._insert_op( - index=lookup_table_op_index + 2, - type="merge_ids", - inputs={ - 'Ids': [ - program.global_block().vars[varname] - for varname in ids_name - ], - 'X': prefetch_output_vars - }, - outputs={ - "Out": [ - program.global_block().vars[varname] - for varname in out_name - ] - }) + self.all_out_emb_vars.append(out_var) # delete lookup_table_op delete_ops(program.global_block(), [op]) # break for loop break + for index in range(len(self.pserver_endpoints)): + in_var = program.global_block().create_var( + name=str("prefetch_compress_in_tmp_" + str(index)), + type=self.all_in_ids_vars[0].type, + shape=self.all_in_ids_vars[0].shape, + dtype=self.all_in_ids_vars[0].dtype) + self.all_prefetch_input_vars.append(in_var) + + out_var = program.global_block().create_var( + name=str("prefetch_compress_out_tmp_" + str(index)), + type=self.all_out_emb_vars[0].type, + shape=self.all_out_emb_vars[0].shape, + dtype=self.all_out_emb_vars[0].dtype) + self.all_prefetch_output_vars.append(out_var) + + # insert split_ids_op + program.global_block()._insert_op( + index=lookup_table_op_index, + type="split_ids", + inputs={'Ids': self.all_in_ids_vars}, + outputs={"Out": self.all_prefetch_input_vars}) + + # insert prefetch_op + program.global_block()._insert_op( + index=lookup_table_op_index + 1, + type="prefetch", + inputs={'X': self.all_prefetch_input_vars}, + outputs={"Out": self.all_prefetch_output_vars}, + attrs={ + "epmap": pserver_endpoints, + # FIXME(qiao) temporarily disable this config because prefetch + # is not act as other rpc op, it's more like a forward op + # RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) + + # insert concat_op + program.global_block()._insert_op( + index=lookup_table_op_index + 2, + type="merge_ids", + inputs={ + 'Ids': self.all_in_ids_vars, + 'Rows': self.all_prefetch_input_vars, + 'X': self.all_prefetch_output_vars + }, + outputs={"Out": self.all_out_emb_vars}) + def _split_table_grad_and_add_send_vars(self, program, pserver_endpoints): # 2. add split_ids_op and send_op to send gradient to pservers @@ -1134,7 +1146,8 @@ to transpile() call.") inputs={ 'Ids': [program.global_block().vars[table_grad_name]] }, - outputs={"Out": self.trainer_side_table_grad_list}) + outputs={"Out": self.trainer_side_table_grad_list}, + attrs={RPC_OP_ROLE_ATTR_NAME: DIST_OP_ROLE_ATTR_VALUE}) program.global_block()._insert_op( index=op_index + 2, type="send", @@ -1160,32 +1173,31 @@ to transpile() call.") # STEP: create prefetch block table_var = pserver_program.global_block().vars[self.table_name] prefetch_var_name_to_block_id = [] - for index in range(len(self.all_prefetch_input_vars)): - prefetch_block = pserver_program._create_block(optimize_block.idx) - trainer_ids = self.all_prefetch_input_vars[index][pserver_index] - pserver_ids = pserver_program.global_block().create_var( - name=trainer_ids.name, - type=trainer_ids.type, - shape=trainer_ids.shape, - dtype=trainer_ids.dtype) - trainer_out = self.all_prefetch_output_vars[index][pserver_index] - pserver_out = pserver_program.global_block().create_var( - name=trainer_out.name, - type=trainer_out.type, - shape=trainer_out.shape, - dtype=trainer_out.dtype) - prefetch_block.append_op( - type="lookup_sparse_table", - inputs={'Ids': pserver_ids, - "W": table_var}, - outputs={"Out": pserver_out}, - attrs={ - "is_sparse": True, # has no effect on lookup_table op - "is_distributed": True, - "padding_idx": -1 - }) - prefetch_var_name_to_block_id.append(trainer_ids.name + ":" + str( - prefetch_block.idx)) + prefetch_block = pserver_program._create_block(optimize_block.idx) + trainer_ids = self.all_prefetch_input_vars[pserver_index] + pserver_ids = pserver_program.global_block().create_var( + name=trainer_ids.name, + type=trainer_ids.type, + shape=trainer_ids.shape, + dtype=trainer_ids.dtype) + trainer_out = self.all_prefetch_output_vars[pserver_index] + pserver_out = pserver_program.global_block().create_var( + name=trainer_out.name, + type=trainer_out.type, + shape=trainer_out.shape, + dtype=trainer_out.dtype) + prefetch_block.append_op( + type="lookup_sparse_table", + inputs={'Ids': pserver_ids, + "W": table_var}, + outputs={"Out": pserver_out}, + attrs={ + "is_sparse": True, # has no effect on lookup_table op + "is_distributed": True, + "padding_idx": -1 + }) + prefetch_var_name_to_block_id.append(trainer_ids.name + ":" + str( + prefetch_block.idx)) return prefetch_var_name_to_block_id def _create_table_optimize_block(self, pserver_index, pserver_program, @@ -1364,16 +1376,6 @@ to transpile() call.") program.global_block()._sync_with_cpp() return var_mapping - def _create_splited_vars(self, source_var, block, tag): - return [ - block.create_var( - name=str(source_var.name + tag + str(index)), - type=source_var.type, - shape=source_var.shape, - dtype=source_var.dtype) - for index in range(len(self.pserver_endpoints)) - ] - def _clone_var(self, block, var, persistable=True): return block.create_var( name=var.name,