diff --git a/doc/design/images/multiple_reader.png b/doc/fluid/design/concepts/images/multiple_reader.png similarity index 100% rename from doc/design/images/multiple_reader.png rename to doc/fluid/design/concepts/images/multiple_reader.png diff --git a/doc/design/images/readers.png b/doc/fluid/design/concepts/images/readers.png similarity index 100% rename from doc/design/images/readers.png rename to doc/fluid/design/concepts/images/readers.png diff --git a/doc/fluid/design/dist_train/distributed_lookup_table_design.md b/doc/fluid/design/dist_train/distributed_lookup_table_design.md new file mode 100644 index 0000000000000000000000000000000000000000..e543adf0f97cc6b47415b807d7a1ed1effec9b22 --- /dev/null +++ b/doc/fluid/design/dist_train/distributed_lookup_table_design.md @@ -0,0 +1,128 @@ +## Design Doc: Distributed Lookup Table Operator + +A lookup table operator in PaddlePaddle where the table could be out +of the memory of a computer. + +## Background + +A lookup table operator is well-used in deep learning for learning the +representation, or the +[*embedding*](http://www.cs.toronto.edu/~fritz/absps/ieee-lre.pdf), of +symbols. + +### The Forward Algorithm + +The forward algorithm of the lookup table is a multiplication of the +input vector x and the lookup table matrix W: + +$$y = x * W$$ + +When x is a sparse vector of symbols, the above multiplication +simplifies into looking up rows in W that correspond to symbols in x, +denoted by W(x). Please be aware that W could be huge and out of the +memory, so we'd need a distributed storage service, which supports the +lookup of rows. + +The following figure illustrates the multiplication of x with two +non-zero elements, or say, two symbols, and a lookup table W: + +![lookup table](./src/lookup_table.png) + +### The Backward Algorithm + +The backward algorithm computes W'(x) using W(x). W'(x) has the same +scale of size as W(x) and is much smaller than W. + +To optimize W given W', we can do simple SGD update: + +$$W = f(W') = \lambda * W'$$ + +or some more sophisticated algorithms that rely on both W' and W: + +$$W = f(W, W')$$ + +The following figure illustrates the backward pass of the lookup +operator: ![lookup table training](./src/lookup_table_training.png) + +## Distributed Storage Service + +The forward algorithm requires a distributed storage service for W. +The backward algorithm prefers that the storage system can apply the +optimization algorithm on W. The following two sections describe two +solutions -- the former doesn't require that the storage service can +do optimization, the latter does. + +### Storage Service Doesn't Optimize + +In this design, we use highly-optimized distributed storage, e.g., +memcached, as the storage service, and we run the optimization +algorithm on parameter servers of PaddlePaddle. The following figure +illustrates the training process. + + + + + +Each trainer runs the forward and backward passes using their local +data: + +1. In the forward pass, when a trainer runs the forward algorithm of a + lookup operator, it retrieves W(x) from the storage service. +1. The trainer computes W'(x) in the backward pass using W(x). + +During the global update process: + +1. Each trainer uploads its W'(x) to parameter servers. +1. The parameter server runs the optimization algorithm, e.g., the + Adam optimization algorithm, which requires that + 1. The parameter server retrieves W(x) from memcached, and + 1. The parameter server pushes $\Delta W(x)=f(W(x), lambda \sum_j + W'(x))$ to memcached, where $f$ denotes the optimization + algorithm. + +### Storage Service Does Optimize + +This design is very similar to the above one, except that the +optimization algorithm $f$ runs on the storage service. + +- Pro: parameter servers do not retrieve W(x) from the storage + service, thus saves half network communication. +- Con: the storage service needs to be able to run the optimization + algorithm. + +## Conclusion + +Let us do the "storage service does not optimize" solution first, as a +baseline at least, because it is easier to use a well-optimized +distributed storage service like memcached. We can do the "storage +service does optimize" solution later or at the same time, which, if +implemented carefully, should have better performance than the former. diff --git a/doc/fluid/design/dist_train/src/lookup_table.png b/doc/fluid/design/dist_train/src/lookup_table.png new file mode 100644 index 0000000000000000000000000000000000000000..72dfe3547f731d0d090338afb206b0549dff472e Binary files /dev/null and b/doc/fluid/design/dist_train/src/lookup_table.png differ diff --git a/doc/fluid/design/dist_train/src/lookup_table_training.png b/doc/fluid/design/dist_train/src/lookup_table_training.png new file mode 100644 index 0000000000000000000000000000000000000000..cc7cc4aeb3b885850fe2f70f19fb84d5873bed1e Binary files /dev/null and b/doc/fluid/design/dist_train/src/lookup_table_training.png differ diff --git a/doc/fluid/design/motivation/fluid.md b/doc/fluid/design/motivation/fluid.md index f78fa8c1914124f33b9730f918c8887ced4f8d9d..110b7d78bf12ac8328fb3a913e4386e75d63c995 100644 --- a/doc/fluid/design/motivation/fluid.md +++ b/doc/fluid/design/motivation/fluid.md @@ -103,7 +103,7 @@ In computability theory, a system of data-manipulation rules, such as a programm There are two ways to execute a Fluid program. When a program is executed, it creates a protobuf message [`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/a91efdde6910ce92a78e3aa7157412c4c88d9ee8/paddle/framework/framework.proto#L145) that describes the process and is conceptually like an [abstract syntax tree](https://en.wikipedia.org/wiki/Abstract_syntax_tree). -There is a C++ class [`Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h), which runs a `ProgramDesc`, similar to how an interpreter runs a Python program. +There is a C++ class [`Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/fluid/framework/executor.h), which runs a `ProgramDesc`, similar to how an interpreter runs a Python program. Fluid is moving towards the direction of a compiler, which is explain in [fluid_compiler.md](fluid_compiler.md). diff --git a/paddle/fluid/framework/channel_test.cc b/paddle/fluid/framework/channel_test.cc index edfb41c72489113d9803c2957baed1ce44f8296d..73be5cdbe2a1f5994ecee4c415e83962f50532fe 100644 --- a/paddle/fluid/framework/channel_test.cc +++ b/paddle/fluid/framework/channel_test.cc @@ -871,3 +871,67 @@ TEST(ChannelHolder, ChannelHolderDestroyUnblocksSendersTest) { ch->Reset(0); ChannelHolderDestroyUnblockSenders(ch, false); } + +// This tests that closing a channelholder many times. +void ChannelHolderManyTimesClose(ChannelHolder *ch) { + const int num_threads = 15; + std::thread t[num_threads]; + bool thread_ended[num_threads]; + + // Launches threads that try to send data to channel. + for (size_t i = 0; i < num_threads / 3; i++) { + thread_ended[i] = false; + t[i] = std::thread( + [&](bool *ended) { + int data = 10; + ch->Send(&data); + *ended = true; + }, + &thread_ended[i]); + } + + // Launches threads that try to receive data to channel. + for (size_t i = num_threads / 3; i < 2 * num_threads / 3; i++) { + thread_ended[i] = false; + t[i] = std::thread( + [&](bool *p) { + int data; + if (ch->Receive(&data)) { + EXPECT_EQ(data, 10); + } + *p = true; + }, + &thread_ended[i]); + } + + // Launches threads that try to close the channel. + for (size_t i = 2 * num_threads / 3; i < num_threads; i++) { + thread_ended[i] = false; + t[i] = std::thread( + [&](bool *p) { + if (!ch->IsClosed()) { + ch->close(); + } + *p = true; + }, + &thread_ended[i]); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait + + // Verify that all threads are unblocked + for (size_t i = 0; i < num_threads; i++) { + EXPECT_EQ(thread_ended[i], true); + } + EXPECT_TRUE(ch->IsClosed()); + // delete the channel + delete ch; + for (size_t i = 0; i < num_threads; i++) t[i].join(); +} + +TEST(ChannelHolder, ChannelHolderManyTimesCloseTest) { + // Check for Buffered Channel + ChannelHolder *ch = new ChannelHolder(); + ch->Reset(10); + ChannelHolderManyTimesClose(ch); +} diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index bcbd717aa47dd98252c5beaaf33d77b466496578..7155d5ef2febc20aaa684c04a7a59f781857c9e5 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -113,10 +113,11 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, // and feed_holder_name. Raise exception when any mismatch is found. // Return true if the block has feed operators and holder of matching info. static bool has_feed_operators( - BlockDesc* block, std::map& feed_targets, + const BlockDesc& block, + std::map& feed_targets, const std::string& feed_holder_name) { size_t feed_count = 0; - for (auto* op : block->AllOps()) { + for (auto* op : block.AllOps()) { if (op->Type() == kFeedOpType) { feed_count++; PADDLE_ENFORCE_EQ(op->Input("X")[0], feed_holder_name, @@ -135,7 +136,7 @@ static bool has_feed_operators( "The number of feed operators should match 'feed_targets'"); // When feed operator are present, so should be feed_holder - auto var = block->FindVar(feed_holder_name); + auto var = block.FindVar(feed_holder_name); PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", feed_holder_name); PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FEED_MINIBATCH, @@ -153,10 +154,10 @@ static bool has_feed_operators( // and fetch_holder_name. Raise exception when any mismatch is found. // Return true if the block has fetch operators and holder of matching info. static bool has_fetch_operators( - BlockDesc* block, std::map& fetch_targets, + const BlockDesc& block, std::map& fetch_targets, const std::string& fetch_holder_name) { size_t fetch_count = 0; - for (auto* op : block->AllOps()) { + for (auto* op : block.AllOps()) { if (op->Type() == kFetchOpType) { fetch_count++; PADDLE_ENFORCE_EQ(op->Output("Out")[0], fetch_holder_name, @@ -175,7 +176,7 @@ static bool has_fetch_operators( "The number of fetch operators should match 'fetch_targets'"); // When fetch operator are present, so should be fetch_holder - auto var = block->FindVar(fetch_holder_name); + auto var = block.FindVar(fetch_holder_name); PADDLE_ENFORCE_NOT_NULL(var, "Block should already have a '%s' variable", fetch_holder_name); PADDLE_ENFORCE_EQ(var->GetType(), proto::VarType::FETCH_LIST, @@ -192,10 +193,19 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, const std::string& feed_holder_name, const std::string& fetch_holder_name) { platform::RecordBlock b(kProgramId); - auto* copy_program = new ProgramDesc(program); + bool has_feed_ops = + has_feed_operators(program.Block(0), feed_targets, feed_holder_name); + bool has_fetch_ops = + has_fetch_operators(program.Block(0), fetch_targets, fetch_holder_name); + + ProgramDesc* copy_program = const_cast(&program); + if (!has_feed_ops || !has_fetch_ops) { + copy_program = std::unique_ptr(new ProgramDesc(program)).get(); + } + auto* global_block = copy_program->MutableBlock(0); - if (!has_feed_operators(global_block, feed_targets, feed_holder_name)) { + if (!has_feed_ops) { // create feed_holder variable auto* feed_holder = global_block->Var(feed_holder_name); feed_holder->SetType(proto::VarType::FEED_MINIBATCH); @@ -228,7 +238,7 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, } } - if (!has_fetch_operators(global_block, fetch_targets, fetch_holder_name)) { + if (!has_fetch_ops) { // create fetch_holder variable auto* fetch_holder = global_block->Var(fetch_holder_name); fetch_holder->SetType(proto::VarType::FETCH_LIST); @@ -262,8 +272,6 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, GetFetchVariable(*scope, fetch_holder_name, idx); } } - - delete copy_program; } ExecutorPrepareContext* Executor::Prepare(const ProgramDesc& program, @@ -313,9 +321,8 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, } // if (create_vars) for (auto& op : ctx->ops_) { - VLOG(4) << place_ << " " << op->DebugStringEx(local_scope); - op->Run(*local_scope, place_); VLOG(3) << place_ << " " << op->DebugStringEx(local_scope); + op->Run(*local_scope, place_); if (FLAGS_benchmark) { VLOG(2) << "Memory used after operator " + op->Type() + " running: " diff --git a/paddle/fluid/framework/init.cc b/paddle/fluid/framework/init.cc index 2e0a224ff5df749fd8c809dc88a85a1643542abf..3c0d93642ac41e8d90f9a248e81cea7a4fe12293 100644 --- a/paddle/fluid/framework/init.cc +++ b/paddle/fluid/framework/init.cc @@ -26,6 +26,7 @@ namespace paddle { namespace framework { std::once_flag gflags_init_flag; +std::once_flag p2p_init_flag; void InitGflags(std::vector &argv) { std::call_once(gflags_init_flag, [&]() { @@ -42,6 +43,27 @@ void InitGflags(std::vector &argv) { }); } +void InitP2P(int count) { +#ifdef PADDLE_WITH_CUDA + std::call_once(p2p_init_flag, [&]() { + for (int i = 0; i < count; ++i) { + for (int j = 0; j < count; ++j) { + if (i == j) continue; + int can_acess = -1; + PADDLE_ENFORCE(cudaDeviceCanAccessPeer(&can_acess, i, j), + "Failed to test P2P access."); + if (can_acess != 1) { + LOG(WARNING) << "Cannot enable P2P access from " << i << " to " << j; + } else { + cudaSetDevice(i); + cudaDeviceEnablePeerAccess(j, 0); + } + } + } + }); +#endif +} + void InitDevices() { /*Init all avaiable devices by default */ @@ -63,7 +85,7 @@ void InitDevices() { for (int i = 0; i < count; ++i) { places.emplace_back(platform::CUDAPlace(i)); } - + InitP2P(count); platform::DeviceContextPool::Init(places); } diff --git a/paddle/fluid/inference/CMakeLists.txt b/paddle/fluid/inference/CMakeLists.txt index 17ccca8cdcbcaabaddbbc0ca1d3ca4fdf054b0fb..aff427310f15be72f5c8d0fa1537ffa6bbe2881d 100644 --- a/paddle/fluid/inference/CMakeLists.txt +++ b/paddle/fluid/inference/CMakeLists.txt @@ -13,6 +13,11 @@ cc_library(paddle_fluid_shared SHARED SRCS io.cc DEPS ARCHIVE_START ${GLOB_OP_LIB} ${FLUID_CORE_MODULES} ARCHIVE_END) set_target_properties(paddle_fluid_shared PROPERTIES OUTPUT_NAME paddle_fluid) +if(NOT APPLE) + # TODO(liuyiqun): Temporarily disable the link flag because it is not support on Mac. + set(LINK_FLAGS "-Wl,--version-script ${CMAKE_CURRENT_SOURCE_DIR}/paddle_fluid.map") + set_target_properties(paddle_fluid_shared PROPERTIES LINK_FLAGS "${LINK_FLAGS}") +endif() if(WITH_TESTING) add_subdirectory(tests/book) diff --git a/paddle/fluid/inference/paddle_fluid.map b/paddle/fluid/inference/paddle_fluid.map new file mode 100644 index 0000000000000000000000000000000000000000..5203784dc1fcb672eb6a26d9dfd3ffbe02e08038 --- /dev/null +++ b/paddle/fluid/inference/paddle_fluid.map @@ -0,0 +1,6 @@ +{ + global: + *paddle*; + local: + *; +}; diff --git a/paddle/fluid/operators/conv_op.cc b/paddle/fluid/operators/conv_op.cc index e3fc21c90f95469d646139a4454501d1c30bd51c..650bc92be22af9ea8afcacf590a11190109e8811 100644 --- a/paddle/fluid/operators/conv_op.cc +++ b/paddle/fluid/operators/conv_op.cc @@ -70,16 +70,16 @@ void ConvOp::InferShape(framework::InferShapeContext* ctx) const { framework::OpKernelType ConvOp::GetExpectedKernelType( const framework::ExecutionContext& ctx) const { - framework::LibraryType library_{framework::LibraryType::kPlain}; + framework::LibraryType library{framework::LibraryType::kPlain}; #ifdef PADDLE_WITH_CUDA if (platform::CanCUDNNBeUsed(ctx)) { - library_ = framework::LibraryType::kCUDNN; + library = framework::LibraryType::kCUDNN; } #endif #ifdef PADDLE_WITH_MKLDNN - if (library_ == framework::LibraryType::kPlain && + if (library == framework::LibraryType::kPlain && platform::CanMKLDNNBeUsed(ctx)) { - library_ = framework::LibraryType::kMKLDNN; + library = framework::LibraryType::kMKLDNN; } #endif @@ -91,15 +91,15 @@ framework::OpKernelType ConvOp::GetExpectedKernelType( "input and filter data type should be consistent"); if (input_data_type == framework::proto::VarType::FP16) { - PADDLE_ENFORCE_EQ(library_, framework::LibraryType::kCUDNN, + PADDLE_ENFORCE_EQ(library, framework::LibraryType::kCUDNN, "float16 can only be used when CUDNN is used"); } std::string data_format = ctx.Attr("data_format"); // TODO(pzelazko-intel): enable MKLDNN layout when it's ready - framework::DataLayout layout_ = framework::StringToDataLayout(data_format); - return framework::OpKernelType(input_data_type, ctx.GetPlace(), layout_, - library_); + framework::DataLayout layout = framework::StringToDataLayout(data_format); + return framework::OpKernelType(input_data_type, ctx.GetPlace(), layout, + library); } Conv2DOpMaker::Conv2DOpMaker(OpProto* proto, OpAttrChecker* op_checker) diff --git a/paddle/fluid/operators/feed_op.cc b/paddle/fluid/operators/feed_op.cc index 90c31877f6a87d1e237283d489353b4aba26c97b..debacf07c360b9aa69000a0d891f04239ed08807 100644 --- a/paddle/fluid/operators/feed_op.cc +++ b/paddle/fluid/operators/feed_op.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -28,6 +29,10 @@ class FeedOp : public framework::OperatorBase { private: void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { + // get device context from pool + auto *dev_ctx = platform::DeviceContextPool::Instance().Get(place); + platform::RecordEvent record_event(Type(), dev_ctx); + auto feed_var_name = Input("X"); auto *feed_var = scope.FindVar(feed_var_name); @@ -50,14 +55,10 @@ class FeedOp : public framework::OperatorBase { auto &feed_item = feed_list.at(static_cast(col)); auto *out_item = out_var->GetMutable(); - // get device context from pool - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto &dev_ctx = *pool.Get(place); - if (platform::is_same_place(feed_item.place(), place)) { out_item->ShareDataWith(feed_item); } else { - framework::TensorCopy(feed_item, place, dev_ctx, out_item); + framework::TensorCopy(feed_item, place, *dev_ctx, out_item); } out_item->set_lod(feed_item.lod()); } diff --git a/paddle/fluid/operators/fetch_op.cc b/paddle/fluid/operators/fetch_op.cc index d66f01d1b7ce8528a7c0177b2889aff7e0c5a12b..7c7f3e9059fbb1e3f2cca4f04edfff55c9452761 100644 --- a/paddle/fluid/operators/fetch_op.cc +++ b/paddle/fluid/operators/fetch_op.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -29,6 +30,9 @@ class FetchOp : public framework::OperatorBase { private: void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + platform::RecordEvent record_event(Type(), pool.Get(place)); + auto fetch_var_name = Input("X"); auto *fetch_var = scope.FindVar(fetch_var_name); PADDLE_ENFORCE(fetch_var != nullptr, @@ -53,7 +57,6 @@ class FetchOp : public framework::OperatorBase { // FIXME(yuyang18): Should we assume the fetch operator always generate // CPU outputs? - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(src_item.place()); TensorCopy(src_item, platform::CPUPlace(), dev_ctx, &dst_item); diff --git a/paddle/fluid/operators/load_op.cc b/paddle/fluid/operators/load_op.cc index 05f809ac5628420251957116bb2390b4502f11b8..6ffe0bec5e38432676ecadfa1abbbe70a1425bb1 100644 --- a/paddle/fluid/operators/load_op.cc +++ b/paddle/fluid/operators/load_op.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -29,6 +30,9 @@ class LoadOp : public framework::OperatorBase { private: void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { + auto *dev_ctx = platform::DeviceContextPool::Instance().Get(place); + platform::RecordEvent record_event(Type(), dev_ctx); + auto filename = Attr("file_path"); std::ifstream fin(filename); PADDLE_ENFORCE(static_cast(fin), "Cannot open file %s for load op", @@ -41,9 +45,7 @@ class LoadOp : public framework::OperatorBase { auto *tensor = out_var->GetMutable(); - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto &dev_ctx = *pool.Get(place); - DeserializeFromStream(fin, tensor, dev_ctx); + DeserializeFromStream(fin, tensor, *dev_ctx); if (platform::is_gpu_place(place)) { // copy CPU to GPU @@ -55,7 +57,7 @@ class LoadOp : public framework::OperatorBase { out_var->Clear(); tensor = out_var->GetMutable(); tensor->set_lod(cpu_tensor.lod()); - TensorCopy(cpu_tensor, place, dev_ctx, tensor); + TensorCopy(cpu_tensor, place, *dev_ctx, tensor); } } }; diff --git a/paddle/fluid/operators/lod_reset_op.cc b/paddle/fluid/operators/lod_reset_op.cc index 6a66297cb843ead1a507a6867c1c562224861cbf..7d5687f2d0666d393d7bb1c1a2fdde6c95e6d615 100644 --- a/paddle/fluid/operators/lod_reset_op.cc +++ b/paddle/fluid/operators/lod_reset_op.cc @@ -22,17 +22,16 @@ class LoDResetOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - // input check PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) of LoDResetOp should not be null."); PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) of LoDResetOp should not be null."); - // If target LoD is not set form Input(), then it must be set from Attr(). - if (!ctx->HasInput("TargetLoD")) { + + if (!ctx->HasInput("Y")) { auto level0 = ctx->Attrs().Get>("target_lod"); - PADDLE_ENFORCE(level0.size() > 1, - "Target LoD is not found, should be set to be a valid one " - "through Input() or Attr()."); + PADDLE_ENFORCE_GT(level0.size(), 1, + "If Input(Y) not provided, the target lod should be " + "specified by attribute `target_lod`."); } ctx->SetOutputDim("Out", ctx->GetInputDim("X")); } @@ -50,36 +49,77 @@ class LoDResetOpMaker : public framework::OpProtoAndCheckerMaker { public: LoDResetOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { - AddInput("X", "(LoDTensor) The input tensor of lod_reset operator."); - AddInput("TargetLoD", - "(Tensor, optional) The target level 0 LoD from Input().") + AddInput("X", + "(Tensor, LoDTensor) Input variable of LoDResetOp which " + "could be a Tensor or LoDTensor, where the data of output " + "variable inherits from."); + AddInput("Y", + "(Tensor, LoDTensor, optional) If provided and Y is LoDTensor, " + "lod of Input(Y) would be considered as the target lod first, " + "otherwise data of Input(Y) would be considered as the " + "target lod.") .AsDispensable(); - AddOutput("Out", "(LoDTensor) The output tensor of lod_reset operator."); + AddOutput("Out", + "(LoDTensor) Output variable of LoDResetOp which should be a " + "LoDTensor."); AddAttr>("target_lod", "The target level 0 LoD from Attr().") .SetDefault(std::vector{}); AddComment(R"DOC(LoDReset operator -Reset LoD of Input(X) into a new one specified by Input(TargetLoD) or -Attr(target_lod), or set LoD for Input(X) if it doesn't have one. -Currently the lod_reset operator only supports the reset of level 0 LoD. -At least one of Input(TargetLoD) and Attr(target_lod) must be set, -and if both of them are set, Input(TargetLoD) will be chosen as the -target LoD. +Set LoD of `X` to a new one specified by `Y` or attribute `target_lod`. When `Y` +provided and `Y` is a LoDTensor, `Y.lod` would be considered as target LoD +first, otherwise `Y.data` would be considered as target LoD. If `Y` is not +provided, target LoD should be specified by attribute `target_lod`. +If target LoD is specified by `Y.data` or `target_lod`, only one level LoD +is supported. + +Example 1: + +Given a 1-level LoDTensor input(X): + X.lod = [[ 0, 2, 5 6 ]] + X.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + X.dims = [6, 1] + +attr(target_lod): [0, 4, 6] + +then we get a 1-level LoDTensor: + Out.lod = [[ 0, 4, 6 ]] + Out.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + Out.dims = [6, 1] + +Example 2: -An example: -Given a float LoDTensor X with shape (6, 1), its transpose form represents +Given a 1-level LoDTensor input(X): + X.lod = [[ 0, 2, 5 6 ]] + X.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + X.dims = [6, 1] - [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], +input(Y) is a Tensor: + Y.data = [[0, 2, 6]] + Y.dims = [1, 3] -with LoD = [[0, 2, 5, 6]] and the three (transposed) sequences look like +then we get a 1-level LoDTensor: + Out.lod = [[ 0, 2, 6 ]] + Out.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + Out.dims = [6, 1] - [1.0, 2.0], [3.0, 4.0, 5.0], [6.0]. +Example 3: -If target LoD = [0, 4, 6], the lod_reset operator will reset the LoD and -the sequences that the LoDTensor Output(Out) contains becomes: +Given a 1-level LoDTensor input(X): + X.lod = [[ 0, 2, 5 6 ]] + X.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + X.dims = [6, 1] - [1.0, 2.0, 3.0, 4.0], [5.0, 6.0]. +input(Y) is a 2-level LoDTensor: + Y.lod = [[0, 2, 4], [0, 2, 5, 6]] + Y.data = [[1.1], [2.1], [3.1], [4.1], [5.1], [6.1]] + Y.dims = [6, 1] + +then we get a 2-level LoDTensor: + Out.lod = [[0, 2, 4], [0, 2, 5, 6]] + Out.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + Out.dims = [6, 1] )DOC"); } @@ -90,10 +130,16 @@ class LoDResetGradOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("X"), "Input(X) shouldn't be null."); + PADDLE_ENFORCE(ctx->HasInput("X"), + "Input(X) of LoDResetGradOp should not be null."); PADDLE_ENFORCE(ctx->HasInput(framework::GradVarName("Out")), - "Input(Out@GRAD) shouldn't be null."); - ctx->SetOutputDim(framework::GradVarName("X"), ctx->GetInputDim("X")); + "Input(Out@Grad) of LoDResetGradOp should not be null."); + + auto x_grad_name = framework::GradVarName("X"); + if (ctx->HasOutput(x_grad_name)) { + ctx->SetOutputDim(x_grad_name, ctx->GetInputDim("X")); + ctx->ShareLoD("X", /*->*/ x_grad_name); + } } protected: @@ -111,9 +157,13 @@ class LoDResetGradOp : public framework::OperatorWithKernel { namespace ops = paddle::operators; REGISTER_OP(lod_reset, ops::LoDResetOp, ops::LoDResetOpMaker, lod_reset_grad, ops::LoDResetGradOp); -REGISTER_OP_CPU_KERNEL(lod_reset, - ops::LoDResetKernel, - ops::LoDResetKernel); +REGISTER_OP_CPU_KERNEL( + lod_reset, ops::LoDResetKernel, + ops::LoDResetKernel, + ops::LoDResetKernel, + ops::LoDResetKernel); REGISTER_OP_CPU_KERNEL( lod_reset_grad, ops::LoDResetGradKernel, - ops::LoDResetGradKernel); + ops::LoDResetGradKernel, + ops::LoDResetGradKernel, + ops::LoDResetGradKernel); diff --git a/paddle/fluid/operators/lod_reset_op.cu b/paddle/fluid/operators/lod_reset_op.cu index b0e87a851a77a1cc98d419a63d4d9e5e1b9dd163..888d4c12eb4e3f4fd94d8dd4178c59acd0abb23b 100644 --- a/paddle/fluid/operators/lod_reset_op.cu +++ b/paddle/fluid/operators/lod_reset_op.cu @@ -18,8 +18,12 @@ namespace ops = paddle::operators; REGISTER_OP_CUDA_KERNEL( lod_reset, ops::LoDResetKernel, - ops::LoDResetKernel); + ops::LoDResetKernel, + ops::LoDResetKernel, + ops::LoDResetKernel); REGISTER_OP_CUDA_KERNEL( lod_reset_grad, ops::LoDResetGradKernel, - ops::LoDResetGradKernel); + ops::LoDResetGradKernel, + ops::LoDResetGradKernel, + ops::LoDResetGradKernel); diff --git a/paddle/fluid/operators/lod_reset_op.h b/paddle/fluid/operators/lod_reset_op.h index 8186d4f8262101edc723af390eee1aec4fa6f3a5..99f01c2a255ade81421c2bba95ff3d38ced6f87c 100644 --- a/paddle/fluid/operators/lod_reset_op.h +++ b/paddle/fluid/operators/lod_reset_op.h @@ -26,35 +26,46 @@ class LoDResetKernel : public framework::OpKernel { void Compute(const framework::ExecutionContext& ctx) const { auto* out = ctx.Output("Out"); auto* in = ctx.Input("X"); - auto* lod_t = ctx.Input("TargetLoD"); + auto* lod_t = ctx.Input("Y"); + + out->ShareDataWith(*in); std::vector level0; if (lod_t) { - auto* lod = lod_t->data(); - if (platform::is_gpu_place(ctx.GetPlace())) { - framework::Tensor lod_cpu; - framework::TensorCopy(*lod_t, platform::CPUPlace(), - ctx.device_context(), &lod_cpu); - lod = lod_cpu.data(); + if (lod_t->lod().size() > 0) { + auto y_lod = lod_t->lod(); + auto last_level = y_lod[y_lod.size() - 1]; + PADDLE_ENFORCE_EQ(last_level.back(), in->dims()[0], + "Last value of `Y`'s last level LoD should be equal " + "to the first dimension of `X`"); + out->set_lod(y_lod); + return; // early return, since lod already set + } else { + auto* lod = lod_t->data(); + if (platform::is_gpu_place(ctx.GetPlace())) { + framework::Tensor lod_cpu; + framework::TensorCopy(*lod_t, platform::CPUPlace(), + ctx.device_context(), &lod_cpu); + lod = lod_cpu.data(); + } + level0 = std::vector(lod, lod + lod_t->numel()); } - level0 = std::vector(lod, lod + lod_t->numel()); } else { level0 = ctx.Attr>("target_lod"); } - PADDLE_ENFORCE(level0.size() > 1UL, - "The size of target LoD should be greater than 1."); - PADDLE_ENFORCE(level0[0] == 0, - "Target LoD should be a vector starting from 0."); - PADDLE_ENFORCE(level0.back() == in->dims()[0], - "Target LoD should be a vector end with the " - "first dimension of Input(X)."); + PADDLE_ENFORCE_GT(level0.size(), 1UL, + "Size of target LoD should be greater than 1."); + PADDLE_ENFORCE_EQ(level0[0], 0, + "Target LoD should be a vector starting from 0."); + PADDLE_ENFORCE_EQ(level0.back(), in->dims()[0], + "Target LoD should be a vector end with the " + "first dimension of Input(X)."); for (size_t i = 0; i < level0.size() - 1; ++i) { PADDLE_ENFORCE(level0[i + 1] > level0[i], "Target LoD should be an ascending vector."); } - out->ShareDataWith(*in); // cast level0 to size_t std::vector ulevel0(level0.size(), 0); std::transform(level0.begin(), level0.end(), ulevel0.begin(), diff --git a/paddle/fluid/operators/math/CMakeLists.txt b/paddle/fluid/operators/math/CMakeLists.txt index fba1612d10f0494f4ab06fabdd0e799a74dafd53..547d081006f1c28ba73cb02d38e36bb612cea494 100644 --- a/paddle/fluid/operators/math/CMakeLists.txt +++ b/paddle/fluid/operators/math/CMakeLists.txt @@ -43,7 +43,7 @@ math_library(sequence2batch) math_library(sequence_padding) math_library(sequence_pooling DEPS math_function) math_library(sequence_scale) -math_library(softmax) +math_library(softmax DEPS math_function) math_library(unpooling) math_library(vol2col) diff --git a/paddle/fluid/operators/math/concat.cc b/paddle/fluid/operators/math/concat.cc index b542143419e05e9baf29e9a2322447f32ddd9829..b672c79afd97e36894af647fd4bc6edfb885ff13 100644 --- a/paddle/fluid/operators/math/concat.cc +++ b/paddle/fluid/operators/math/concat.cc @@ -44,7 +44,7 @@ class ConcatFunctor { out_cols += t_cols; input_cols[i] = t_cols; } - auto& cpu_place = boost::get(context.GetPlace()); + auto cpu_place = boost::get(context.GetPlace()); // computation for (int k = 0; k < out_rows; ++k) { @@ -87,7 +87,7 @@ class ConcatGradFunctor { input_cols += t_cols; output_cols[i] = t_cols; } - auto& cpu_place = boost::get(context.GetPlace()); + auto cpu_place = boost::get(context.GetPlace()); // computation for (int k = 0; k < input_rows; ++k) { diff --git a/python/paddle/fluid/concurrency.py b/python/paddle/fluid/concurrency.py index 535e881c42f675198a2679cb7974af64b65cc194..0fc4981a8e9da09f15e6d0a5e5c6761e01328876 100644 --- a/python/paddle/fluid/concurrency.py +++ b/python/paddle/fluid/concurrency.py @@ -131,7 +131,7 @@ def make_channel(dtype, capacity=0): return channel -def channel_send(channel, value): +def channel_send(channel, value, copy=False): """ Sends a value through a channel variable. Used by an unbuffered or buffered channel to pass data from within or to a concurrent Go block, where @@ -141,6 +141,8 @@ def channel_send(channel, value): channel (Variable|Channel): Channel variable created using `make_channel`. value (Variable): Value to send to channel + copy (bool): Copy data while channel send. If False, then data + is moved. The input cannot be used after move. Returns: Variable: The boolean status on whether or not the channel successfully sent the passed value. @@ -162,11 +164,26 @@ def channel_send(channel, value): type=core.VarDesc.VarType.LOD_TENSOR, dtype=core.VarDesc.VarType.BOOL) + X = value + + if copy is True: + copied_X = helper.create_variable( + name=unique_name.generate(value.name + '_copy'), + type=value.type, + dtype=value.dtype, + shape=value.shape, + lod_level=value.lod_level, + capacity=value.capacity) + + assign_op = channel_send_block.append_op( + type="assign_op", inputs={"X": value}, outputs={"Out": copied_X}) + X = copied_X + channel_send_op = channel_send_block.append_op( type="channel_send", inputs={ "Channel": channel, - "X": value, + "X": X, }, outputs={"Status": status}) diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index bf161d6618b10da66f25d3f11300a4a2b10b875a..9656dcf94f14ad9250bb7e79c1330c9bdd44d9d6 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -73,6 +73,7 @@ __all__ = [ 'smooth_l1', 'one_hot', 'autoincreased_step_counter', + 'lod_reset', ] @@ -2225,7 +2226,7 @@ def reduce_prod(input, dim=None, keep_dim=False, name=None): keep_dim (bool|False): Whether to reserve the reduced dimension in the output Tensor. The result tensor will have one fewer dimension than the :attr:`input` unless :attr:`keep_dim` is true. - name(str|None): A name for this layer(optional). If set None, the + name(str|None): A name for this layer(optional). If set None, the layer will be named automatically. Returns: @@ -2241,7 +2242,7 @@ def reduce_prod(input, dim=None, keep_dim=False, name=None): fluid.layers.reduce_prod(x) # [0.0002268] fluid.layers.reduce_prod(x, dim=0) # [0.02, 0.06, 0.3, 0.63] fluid.layers.reduce_prod(x, dim=-1) # [0.027, 0.0084] - fluid.layers.reduce_prod(x, dim=1, + fluid.layers.reduce_prod(x, dim=1, keep_dim=True) # [[0.027], [0.0084]] """ helper = LayerHelper('reduce_prod', **locals()) @@ -3292,3 +3293,98 @@ def autoincreased_step_counter(counter_name=None, begin=1, step=1): counter.stop_gradient = True return counter + + +def lod_reset(x, y=None, target_lod=None): + """ + LoD Reset Operator. Set LoD of **x** to a new one specified by **y** or + **target_lod**. When **y** provided, **y.lod** would be considered as target + LoD first, otherwise **y.data** would be considered as target LoD. If **y** + is not provided, target LoD should be specified by **target_lod**. + If target LoD is specified by **Y.data** or **target_lod**, only one level + LoD is supported. + + .. code-block:: text + + * Example 1: + + Given a 1-level LoDTensor x: + x.lod = [[ 0, 2, 5 6 ]] + x.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + x.dims = [6, 1] + + target_lod: [0, 4, 6] + + then we get a 1-level LoDTensor: + out.lod = [[ 0, 4, 6 ]] + out.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + out.dims = [6, 1] + + * Example 2: + + Given a 1-level LoDTensor x: + x.lod = [[ 0, 2, 5 6 ]] + x.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + x.dims = [6, 1] + + y is a Tensor: + y.data = [[0, 2, 6]] + y.dims = [1, 3] + + then we get a 1-level LoDTensor: + out.lod = [[ 0, 2, 6 ]] + out.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + out.dims = [6, 1] + + * Example 3: + + Given a 1-level LoDTensor x: + x.lod = [[ 0, 2, 5 6 ]] + x.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + x.dims = [6, 1] + + y is a 2-level LoDTensor: + y.lod = [[0, 2, 4], [0, 2, 5, 6]] + y.data = [[1.1], [2.1], [3.1], [4.1], [5.1], [6.1]] + y.dims = [6, 1] + + then we get a 2-level LoDTensor: + out.lod = [[0, 2, 4], [0, 2, 5, 6]] + out.data = [[1.0], [2.0], [3.0], [4.0], [5.0], [6.0]] + out.dims = [6, 1] + + Args: + x (Variable): Input variable which could be a Tensor or LodTensor. + y (Variable|None): If provided, output's LoD would be derived from y. + target_lod (list|tuple|None): One level LoD which should be considered + as target LoD when y not provided. + + Returns: + Variable: Output variable with LoD specified by this operator. + + Raises: + ValueError: If y and target_lod are both None. + + Examples: + .. code-block:: python + + x = layers.data(name='x', shape=[10]) + y = layers.data(name='y', shape=[10, 20], lod_level=2) + out = layers.lod_reset(x=x, y=y) + """ + helper = LayerHelper("lod_reset", **locals()) + out = helper.create_tmp_variable(dtype=x.dtype) + if y is not None: + helper.append_op( + type="lod_reset", inputs={'X': x, + 'Y': y}, outputs={'Out': out}) + elif target_lod is not None: + helper.append_op( + type="lod_reset", + inputs={'X': x}, + attrs={'target_lod': target_lod}, + outputs={'Out': out}) + else: + raise ValueError("y and target_lod should not be both None.") + + return out diff --git a/python/paddle/fluid/tests/unittests/test_layers.py b/python/paddle/fluid/tests/unittests/test_layers.py index 90d70aa39fdc4d4d3f9062eb6a3eb0cdd014acfc..744a762ae7664f1f28713c505f9112ba712fd41d 100644 --- a/python/paddle/fluid/tests/unittests/test_layers.py +++ b/python/paddle/fluid/tests/unittests/test_layers.py @@ -327,6 +327,15 @@ class TestBook(unittest.TestCase): self.assertIsNotNone(loss) print(str(program)) + def test_lod_reset(self): + program = Program() + with program_guard(program): + x = layers.data(name='x', shape=[10], dtype='float32') + y = layers.data( + name='y', shape=[10, 20], dtype='float32', lod_level=2) + print(layers.lod_reset(x=x, y=y)) + print(str(program)) + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_lod_reset_op.py b/python/paddle/fluid/tests/unittests/test_lod_reset_op.py index 3bf8230f8748dd87ec3c85b0cbd78df2e695a96b..6b6d4c824aeae319dacf224408ce96a0d9c5bb35 100644 --- a/python/paddle/fluid/tests/unittests/test_lod_reset_op.py +++ b/python/paddle/fluid/tests/unittests/test_lod_reset_op.py @@ -42,7 +42,7 @@ class TestLodResetOpByInput(OpTest): target_lod_0 = [0, 4, 7, 10] self.inputs = { 'X': (x, lod), - 'TargetLoD': np.array([target_lod_0]).astype('int32') + 'Y': np.array([target_lod_0]).astype('int32') } self.outputs = {'Out': (x, [target_lod_0])} @@ -50,7 +50,7 @@ class TestLodResetOpByInput(OpTest): self.check_output() def test_check_grad(self): - self.check_grad(["X"], "Out", no_grad_set=set("TargetLoD")) + self.check_grad(["X"], "Out", no_grad_set=set("Y")) class TestLodResetOpBoth(OpTest): @@ -62,7 +62,7 @@ class TestLodResetOpBoth(OpTest): target_lod_0_in = [0, 4, 7, 10] self.inputs = { 'X': (x, lod), - 'TargetLoD': np.array(target_lod_0_in).astype('int32') + 'Y': np.array(target_lod_0_in).astype('int32') } self.attrs = {'target_lod': target_lod_0_attr} self.outputs = {'Out': (x, [target_lod_0_in])} @@ -71,7 +71,24 @@ class TestLodResetOpBoth(OpTest): self.check_output() def test_check_grad(self): - self.check_grad(["X"], "Out", no_grad_set=set("TargetLoD")) + self.check_grad(["X"], "Out", no_grad_set=set("Y")) + + +class TestLodResetOpYIsLoDTensor(OpTest): + def setUp(self): + self.op_type = "lod_reset" + x = np.random.random((10, 20)).astype("float32") + lod = [[0, 3, 5, 10]] + y = np.random.random((10, 10)).astype("float32") + target_lod_0 = [[0, 4, 7, 10]] + self.inputs = {'X': (x, lod), 'Y': (y, target_lod_0)} + self.outputs = {'Out': (x, target_lod_0)} + + def test_check_output(self): + self.check_output() + + def test_check_grad(self): + self.check_grad(["X"], "Out", no_grad_set=set("Y")) if __name__ == '__main__':