diff --git a/doc/v2/howto/rnn/recurrent_group_en.md b/doc/v2/howto/rnn/recurrent_group_en.md
index d264b0a9f85faffd49c1982117cb5a3ac6ffc015..de6b60f29eb97029a54609cd2194bb7faf3ffec5 100644
--- a/doc/v2/howto/rnn/recurrent_group_en.md
+++ b/doc/v2/howto/rnn/recurrent_group_en.md
@@ -1,3 +1,96 @@
# Recurrent Group Tutorial
-TBD
+## Overview
+
+Sequential data is common in natural language processing.
+
+A sentence is a sequence of words and many sentences form a paragraph further. Therefore, a paragraph can be viewed as a nested sequence with two level, where each element of the sequence is another sequence. That is to say, sequential data could be recursive. An example of two-level recursive sequential data is that an article is composed of a sequence of sentences, and each sentence a sequence of words.
+
+PaddlePaddle and PaddlePaddle v2 support two-level recursive sequential data. The two-level sequence is a very flexible data, which helps us to better describe more complex language data such as discribing paragraphs and several rounds of dialogues. Based on two-level sequence input, we can design and build a flexible, hierarchical RNN model that encodes input data from the word and sentence level. For the support of arbitrary levels, please refer to PaddlePaddle Fluid.
+
+In PaddlePaddle, `recurrent_group` is an arbitrarily complex RNN unit. The user only needs to define the calculation that the RNN will complete in one time step. PaddlePaddle is responsible for the propagation of information and error in time series.
+
+Furthermore, `recurrent_group` can also be extended to handle two-level sequence. By defining two nested `recurrent_group` operations at the clause level and the word level respectively, a hierarchical and complex RNN is finally achieved.
+
+Currently, in the PaddlePaddle, there are `recurrent_group` and some Layers that can process bidirectional sequences. For details, refer to the document: Layers for supporting double-layer sequences as input.
+
+## Related Concepts
+
+### Basic Principle
+`recurrent_group` is an arbitrarily complex RNN unit supported by PaddlePaddle. The user only needs to focus on the calculations that the RNN is designed to complete within a single time step. The PaddlePaddle is responsible for completing the propagation of information and gradients over time.
+
+In PaddlePaddle, a simple call to `recurrent_group` is as follows:
+
+``` python
+recurrent_group(step, input, reverse)
+```
+- step: A callable function that defines the calculations completed by the RNN unit within a time step
+- input: The input must be a single-layer sequence or a double-layer sequence
+- reverse: Whether to process the input sequence in reverse order
+
+The core of using `recurrent_group` is to design the logic of the step function. The step function can be freely combined with various layers supported by PaddlePaddle to complete arbitrary arithmetic logic. The input of `recurrent_group` (input) becomes the input of the step function. Since the step function only focuses on the calculation within one time step of RNN, here `recurrent_group` completes the splitting of the original input data for us.
+
+### Input
+The input sequence processed by `recurrent_group` is mainly divided into the following three types:
+
+- **Input Data**: When putting a two-level sequence into `recurrent_group`, it will be disassembled into a single-level sequence. When putting a single-level sequence into `recurrent_group`, it will be disassembled into a non-sequence and then passed to the step function. This process is completely transparent to the user. There are two possible types: 1) User input via data_layer; 2) Output from other layers.
+
+- **Read-only Memory Input**: `StaticInput` defines a read-only Memory. The input specified by `StaticInput` will not be disassembled by `recurrent_group`, and each time step of the `recurrent_group` loop will always be able to reference all inputs. It may be a non-sequence or a single-layer sequence.
+
+- **Input of Sequence Generation Task**: `GeneratedInput` is only used to specify input data in a sequence generation task.
+
+### Input Example
+
+Sequence generation tasks mostly follow the encoder-decoer architecture. The encoder and decoder can be arbitrary neural network units capable of processing sequences and RNN is the most popular choice.
+
+Given the encoder output and the current word, the decoder predicts the next most likely word each time. In this structure, the decoder accepts two inputs:
+
+- Target sequence to be generated: a input of the decoder and the basis of the decoder loop. `recurrent_group` will disassemble this input type.
+
+- Encoder output, an non-sequencce or single-sequence: a unbounded memory. Each time step in the decoder loop will reference the entire result and should not be disassembled. This type of input must be specified via `StaticInput`. For more discussion on Unbounded Memory, please refer to the paper [Neural Turning Machine](https://arxiv.org/abs/1410.5401).
+
+In a sequence generation task, the decoder RNN always refers to the word vector of the word predicted at the previous moment as the current time input. `GeneratedInput` will automate this process.
+
+### Output
+The `step` function must return the output of one or more Layers. The output of this Layer will be the final output of the entire `recurrent_group`. In the output process, `recurrent_group` will concatenate the output of each time step, which is also transparent to the user.
+
+### Memory
+Memory can only be defined and used in `recurrent_group`. Memory cannot exist independently and must point to a layer defined by PaddlePaddle. Memory is referenced to get a momentary output from this layer, so memory can be interpreted as a delay operation.
+
+The user can explicitly specify the output of a layer to initialize the memory. When not specified, memory is initialized to 0 by default.
+
+## Sequence-level RNN Introduction
+
+`recurrent_group` helps us to split the input sequence, merge the output, and loop through the sequence of computational logic.
+
+Using this feature, the two nested `recurrent_group` can handle the nested two-level sequences, implementing sequence-level RNN structures at both the word and sentence levels.
+
+- Word-level RNN: each state corresponds to a word.
+- Sequence-level RNN: a sequence-layer RNN consists of multiple word-layer RNNs. Each word-layer RNN (ie, each state of a sequence-layer RNN) has a subsequence.
+
+For convenience of description, the following takes the NLP task as an example. A paragraph containing a subsequence is defined as a two-level sequence, and a sentence containing a word is defined as a single-layer sequence. Then, the zero-level sequence is a word.
+
+## Usage of Sequence-level RNN
+
+### Usage of Training Process
+Using `recurrent_group` requires the following conventions:
+
+- **Single-input Single-output**: Both input and output are single layer sequences.
+ - If there are multiple inputs, the number of words in different input sequences must be exactly equal.
+ - A single-layer sequence is output, and the number of words in the output sequence is the same as the input sequence.
+ - memory: define memory to point to a layer in the step function, get a moment output from this layer by referencing memory to form a recurrent connection. The is_seq parameter of memory must be false. If memory is not defined, the operations within each time step are independent.
+ - boot_layer: the initial state of memory, set 0 by default. is_seq in memory must be false.
+
+- **Double-input Double-output**: Both input and output are two-level sequence.
+ - If there are multiple input sequences, the number of subsequence contained in different inputs must be strictly equal, but the number of words in the subsequence may not be equal.
+ - output a two-level sequence. The number of subsequence and the number of words are the same as the specified input sequence and the first input is default.
+ - memory: defining memory in the step function, pointing to a layer, by referring to the memory to get the output of this layer at a time, forming a recurrent connection. The memory defined in the outer `recurrent_group` step function can record the state of the previous subsequence, either as a single-level sequence (only as read-only memory) or as a word. If memory is not defined, the operations between subsequence are independent.
+ - boot_layer: the initial state of memory. It is either a single-level sequence (only as read-only memory) or a vector. The default is not set, that is, the initial state is 0.
+
+- **Double-input Single-output**: not support for now, and output the error with "In hierachical RNN, all out links should be from sequences now".
+
+### Usage of Generation Process
+Using `beam_search` need follow those conventions:
+
+- Word-level RNN: generate the next word from a word.
+- Sequence-level RNN: the single-layer RNN generated subsequence is concatenated into a new double-layer sequence. Semantically, there is no case where a subsequence generates the next subseq directly.
diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc
index 5ddf331cfca39a4e81a42d9ff8efd5af7bcf6829..55b5f113589e090386d287e228349f22fb94a7ab 100644
--- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc
+++ b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc
@@ -76,7 +76,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
}
}
-std::string NCCLAllReduceOpHandle::Name() const { return "NCCL AllReduce"; }
+std::string NCCLAllReduceOpHandle::Name() const { return "nccl_all_reduce"; }
} // namespace details
} // namespace framework
} // namespace paddle
diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.h b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.h
index 045070bb6a97e90600cd24d9f43cd2a10a4bc1f5..ad14a3c5cb4625fa121cad2daed389c441e78771 100644
--- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.h
+++ b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.h
@@ -14,6 +14,9 @@
#pragma once
+#include
+#include
+
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
@@ -34,6 +37,10 @@ struct NCCLAllReduceOpHandle : public OpHandleBase {
std::string Name() const override;
+ // Delay and buffer nccl_all_reduce together can significantly increase
+ // performance. Disable this feature by returning false.
+ bool IsMultiDeviceTransfer() override { return true; };
+
protected:
void RunImpl() override;
};
diff --git a/paddle/fluid/framework/details/op_handle_base.h b/paddle/fluid/framework/details/op_handle_base.h
index 71672fd24c65ee654fb9f703ea5808c31ee8fbb0..d7a541ac4bb83625060db337446d03a1afda3ed0 100644
--- a/paddle/fluid/framework/details/op_handle_base.h
+++ b/paddle/fluid/framework/details/op_handle_base.h
@@ -13,6 +13,8 @@
// limitations under the License.
#pragma once
+#include
+#include
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/platform/device_context.h"
@@ -53,6 +55,10 @@ class OpHandleBase {
void AddOutput(VarHandleBase *out);
+ // If the Op involves data transfer of multiple devices that
+ // will likely block other computations.
+ virtual bool IsMultiDeviceTransfer() { return false; }
+
protected:
virtual void RunImpl() = 0;
};
diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
index 3f8655147b688239509dea98925df310a46cbef8..1f96b9dc6235a18f7566c98cca60baa964e6aa56 100644
--- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
+++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc
@@ -23,22 +23,36 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
size_t num_threads, bool use_event,
const std::vector &local_scopes,
const std::vector &places,
- std::unique_ptr &&graph)
+ std::unique_ptr &&graph, bool allow_op_delay)
: SSAGraphExecutor(std::move(graph)),
pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr),
local_scopes_(local_scopes),
places_(places),
fetch_ctxs_(places),
- use_event_(use_event) {}
+ use_event_(use_event),
+ running_ops_(0),
+ allow_op_delay_(allow_op_delay) {}
+
+void ThreadedSSAGraphExecutor::RunDelayedOps(
+ const std::unordered_set &delayed_ops) {
+ for (auto op : delayed_ops) {
+ op->Run(use_event_);
+ }
+}
FeedFetchList ThreadedSSAGraphExecutor::Run(
const std::vector &fetch_tensors) {
std::unordered_map pending_ops;
std::unordered_set pending_vars;
-
BlockingQueue ready_vars;
-
std::unordered_set ready_ops;
+ // For ops (e.g. nccl_all_reduce) that need to coordinate multiple
+ // streams from multiple GPUs, it's faster to buffer them and schedule
+ // together since we currently cannot overlap computation and memcpy streams.
+ // Should revisit it if overlapping is available.
+ std::unordered_set delayed_ops;
+ std::unordered_set blocked_by_delayed_ops;
+ std::unordered_set delayed_vars;
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var);
@@ -106,7 +120,14 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto run_all_ready_ops = [&] {
for (auto *op : ready_ops) {
- RunOp(ready_vars, op);
+ if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
+ delayed_ops.insert(op);
+ delayed_vars.insert(op->outputs_.begin(), op->outputs_.end());
+ ready_vars.Extend(op->outputs_);
+ continue;
+ }
+ running_ops_++;
+ RunOp(&ready_vars, op);
}
ready_ops.clear();
};
@@ -118,13 +139,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
}
// Step 3. Execution
- while (!pending_vars.empty()) {
+ while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
// 1. Run All Ready ops
run_all_ready_ops();
// 2. Find ready variable
bool timeout;
- auto cur_ready_vars = ready_vars.PopAll(1000, &timeout);
+ auto cur_ready_vars = ready_vars.PopAll(1, &timeout);
if (timeout) {
if (exception_) {
@@ -141,13 +162,29 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto &deps = pending_ops[op];
--deps;
if (deps == 0) {
- ready_ops.insert(op);
+ if (delayed_vars.find(ready_var) != delayed_vars.end()) {
+ blocked_by_delayed_ops.insert(op);
+ } else {
+ ready_ops.insert(op);
+ }
}
}
}
+ // When there are no other ops to schedule, schedule buffered delayed
+ // ops and unblock other ops.
+ if (ready_ops.empty() && !delayed_ops.empty() && running_ops_ == 0) {
+ RunDelayedOps(delayed_ops);
+ delayed_ops.clear();
+ for (auto *op : blocked_by_delayed_ops) {
+ ready_ops.insert(op);
+ }
+ blocked_by_delayed_ops.clear();
+ }
// Keep loop until all vars are ready.
}
-
+ PADDLE_ENFORCE(ready_ops.empty());
+ PADDLE_ENFORCE(delayed_ops.empty());
+ PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
++computation_count_;
auto sync_computation = [&] {
@@ -182,12 +219,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
}
void ThreadedSSAGraphExecutor::RunOp(
- BlockingQueue &ready_var_q, details::OpHandleBase *op) {
- auto op_run = [&ready_var_q, op, this] {
+ BlockingQueue *ready_var_q, details::OpHandleBase *op) {
+ auto op_run = [ready_var_q, op, this] {
try {
VLOG(10) << op->Name() << " : " << op->DebugString();
op->Run(use_event_);
- ready_var_q.Extend(op->outputs_);
+ running_ops_--;
+ ready_var_q->Extend(op->outputs_);
} catch (platform::EnforceNotMet ex) {
exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) {
diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h
index 2ea57ac8f96bc9c2b5c98bcd25d9ce921c3683cd..79cfc26b461a39811a9a125e5aeac3492d967386 100644
--- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h
+++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h
@@ -14,7 +14,12 @@
#pragma once
-#include
+#include
+#include
+#include
+#include
+#include
+
#include
#include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
@@ -70,7 +75,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
ThreadedSSAGraphExecutor(size_t num_threads, bool use_event,
const std::vector &local_scopes,
const std::vector &places,
- std::unique_ptr &&graph);
+ std::unique_ptr &&graph,
+ bool allow_op_delay);
// Run a SSAGraph by a thread pool
// Use topological sort algorithm
@@ -79,9 +85,11 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
~ThreadedSSAGraphExecutor() {}
private:
- void RunOp(BlockingQueue &ready_var_q,
+ void RunOp(BlockingQueue *ready_var_q,
details::OpHandleBase *op);
+ void RunDelayedOps(const std::unordered_set &delayed_ops);
+
private:
std::unique_ptr<::ThreadPool> pool_;
std::vector local_scopes_;
@@ -89,6 +97,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
platform::DeviceContextPool fetch_ctxs_;
const bool use_event_;
std::unique_ptr exception_;
+ std::atomic running_ops_;
+ bool allow_op_delay_;
size_t computation_count_{0};
size_t max_async_computation{100};
diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc
index 64c06687b6b905186d4efcc8441d3abef6323d53..16a118090ba9cfd50b4b03484983f9fc73cf7973 100644
--- a/paddle/fluid/framework/executor.cc
+++ b/paddle/fluid/framework/executor.cc
@@ -279,6 +279,21 @@ std::unique_ptr Executor::Prepare(
return std::unique_ptr(ctx);
}
+std::vector> Executor::Prepare(
+ const ProgramDesc& program, const std::vector& block_ids) {
+ std::vector> result;
+ for (auto& bid : block_ids) {
+ auto* ctx = new ExecutorPrepareContext(program, bid);
+ PADDLE_ENFORCE_LT(static_cast(bid), program.Size());
+ auto& block = program.Block(bid);
+ for (auto& op_desc : block.AllOps()) {
+ ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
+ }
+ result.push_back(std::shared_ptr(ctx));
+ }
+ return result;
+}
+
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) {
auto& block = ctx->prog_.Block(ctx->block_id_);
diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h
index 7173c51c95e04ad3095f01bb24923a7a3341c517..d7c99165f0c9d3b1ae11a3b4753a61e8118f7b52 100644
--- a/paddle/fluid/framework/executor.h
+++ b/paddle/fluid/framework/executor.h
@@ -61,6 +61,9 @@ class Executor {
static std::unique_ptr Prepare(
const ProgramDesc& program, int block_id);
+ static std::vector> Prepare(
+ const ProgramDesc& program, const std::vector& block_ids);
+
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true,
bool create_vars = true);
diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc
index 577eea92d217f9feb54085b4d9f071494c3c5165..17885143247f0e0db8f12931e3c3412e7114ef3d 100644
--- a/paddle/fluid/framework/parallel_executor.cc
+++ b/paddle/fluid/framework/parallel_executor.cc
@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/parallel_executor.h"
+#include "paddle/fluid/platform/profiler.h"
#include
#include
@@ -47,7 +48,7 @@ ParallelExecutor::ParallelExecutor(
const std::vector &places,
const std::unordered_set ¶ms,
const ProgramDesc &startup_program, const ProgramDesc &main_program,
- const std::string &loss_var_name, Scope *scope)
+ const std::string &loss_var_name, Scope *scope, bool allow_op_delay)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
@@ -82,8 +83,8 @@ ParallelExecutor::ParallelExecutor(
auto graph = builder.Build(main_program);
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
- num_threads, use_event, member_->local_scopes_, places,
- std::move(graph)));
+ num_threads, use_event, member_->local_scopes_, places, std::move(graph),
+ allow_op_delay));
// Step 3. Create vars in each scope;
for (auto *scope : member_->local_scopes_) {
@@ -151,6 +152,7 @@ void ParallelExecutor::BCastParamsToGPUs(
void ParallelExecutor::Run(const std::vector &fetch_tensors,
const std::string &fetched_var_name) {
+ platform::RecordBlock b(0);
auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable() =
fetch_data;
diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h
index 503efa2e447b0ac70f6302aa0a89cc55e5afcb81..964b476234e622cae934d41bc3793bc3114a5f1a 100644
--- a/paddle/fluid/framework/parallel_executor.h
+++ b/paddle/fluid/framework/parallel_executor.h
@@ -14,8 +14,9 @@ limitations under the License. */
#pragma once
-#include
+#include
#include
+#include
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h"
@@ -37,7 +38,8 @@ class ParallelExecutor {
const std::unordered_set& params,
const ProgramDesc& startup_program,
const ProgramDesc& main_program,
- const std::string& loss_var_name, Scope* scope);
+ const std::string& loss_var_name, Scope* scope,
+ bool allow_op_delay);
void Run(const std::vector& fetch_tensors,
const std::string& fetched_var_name = "fetched_var");
diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc
index f59c9b50bb36c12c9abc0a52e0d11c6a73217047..01eb8acc558231d443d4617578cc56d4e895c2f2 100644
--- a/paddle/fluid/operators/detail/variable_response.cc
+++ b/paddle/fluid/operators/detail/variable_response.cc
@@ -13,7 +13,11 @@
// limitations under the License.
#include "paddle/fluid/operators/detail/variable_response.h"
-#include
+
+#include
+#include
+#include
+
#include "paddle/fluid/operators/detail/send_recv.pb.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
@@ -151,7 +155,7 @@ bool VariableResponse::CopySelectRowsTensorData(
auto* tensor = slr->mutable_value();
tensor->Resize(dims);
PADDLE_ENFORCE_EQ(
- tensor->numel(),
+ static_cast(tensor->numel()),
length / framework::SizeOfType(
paddle::operators::detail::ToTypeIndex(meta_.data_type())));
void* tensor_data = tensor->mutable_data(
diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc
index 611457e6d6c721e1c3888f981c32130772378254..9188f2d989e601b7a97dedaf71f7080829cdb7c3 100644
--- a/paddle/fluid/operators/listen_and_serv_op.cc
+++ b/paddle/fluid/operators/listen_and_serv_op.cc
@@ -39,20 +39,23 @@ static void CreateTensorFromMessageType(framework::Variable *var,
}
}
-static void ParallelExecuteBlocks(const std::vector ¶llel_blkids,
- framework::Executor *executor,
- framework::ProgramDesc *program,
- framework::Scope *scope) {
+static void ParallelExecuteBlocks(
+ const std::vector ¶llel_blkids, framework::Executor *executor,
+ const std::vector>
+ &prepared,
+ framework::ProgramDesc *program, framework::Scope *scope) {
std::vector> fs;
for (size_t idx : parallel_blkids) {
- fs.push_back(framework::Async([&executor, &program, &scope, idx]() {
- int run_block = idx; // thread local
- try {
- executor->Run(*program, scope, run_block, false, false);
- } catch (std::exception &e) {
- LOG(ERROR) << "run sub program error " << e.what();
- }
- }));
+ fs.push_back(
+ framework::Async([&executor, &prepared, &program, &scope, idx]() {
+ int run_block = idx; // thread local
+ try {
+ executor->RunPreparedContext(prepared[run_block].get(), scope,
+ false, false);
+ } catch (std::exception &e) {
+ LOG(ERROR) << "run sub program error " << e.what();
+ }
+ }));
}
for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
}
@@ -77,7 +80,6 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
- LOG(INFO) << "created recv scope: " << &recv_scope;
if (!rpc_service_) {
std::string endpoint = Attr("endpoint");
@@ -93,6 +95,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
"server program should have at least 2 blocks");
framework::Executor executor(dev_place);
+ std::vector block_list;
+ for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
+ block_list.push_back(blkid);
+ }
+ auto prepared = executor.Prepare(*program, block_list);
+ // Insert placeholder for block0 which holds current op itself.
+ prepared.insert(prepared.begin(),
+ std::shared_ptr(nullptr));
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
@@ -157,16 +167,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (program->Block(blkid).Parent() != last_parent_blkid) {
- for (size_t idx : parallel_blkids) VLOG(3) << idx;
- ParallelExecuteBlocks(parallel_blkids, &executor, program, &recv_scope);
+ ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
+ &recv_scope);
parallel_blkids.clear();
last_parent_blkid = program->Block(blkid).Parent();
}
parallel_blkids.push_back(blkid);
}
- ParallelExecuteBlocks(parallel_blkids, &executor, program, &recv_scope);
-
- VLOG(3) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
+ ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
+ &recv_scope);
+ VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
// Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next
diff --git a/paddle/fluid/operators/split_ids_op.h b/paddle/fluid/operators/split_ids_op.h
index 3e750ed2d171876ce2d3c232f5d34234217b3c3e..d36ed398ebce661a62ca92696b0089b5289d5b1c 100644
--- a/paddle/fluid/operators/split_ids_op.h
+++ b/paddle/fluid/operators/split_ids_op.h
@@ -30,19 +30,16 @@ class SplitIdsOpKernel : public framework::OpKernel {
PADDLE_THROW("SplitIds do not support GPU kernel");
}
- const auto* ids_t = ctx.Input("Ids");
- auto& ids_dims = ids_t->dims();
+ auto& ids_dims = ctx.Input("Ids")->dims();
+ const T* ids = ctx.Input("Ids")->data();
auto outs = ctx.MultiOutput("Out");
-
- const T* ids = ids_t->data();
-
const size_t shard_num = outs.size();
std::vector> out_ids;
out_ids.resize(outs.size());
// split id by their shard_num.
- for (size_t i = 0; i < ids_dims[0]; ++i) {
+ for (int i = 0; i < ids_dims[0]; ++i) {
T id = ids[i];
size_t shard_id = static_cast(id) % shard_num;
out_ids[shard_id].push_back(id);
diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc
index e1b1bbec97985aa839c62a0a82b81b020faf0008..b0a3f06a8871b1dc8c6c9d7231dfe2c9764ade3f 100644
--- a/paddle/fluid/pybind/pybind.cc
+++ b/paddle/fluid/pybind/pybind.cc
@@ -504,10 +504,10 @@ All parameter, weight, gradient are variables in Paddle.
const std::unordered_set ¶ms,
const ProgramDesc &startup_program,
const ProgramDesc &main_program, const std::string &loss_var_name,
- Scope *scope) {
+ Scope *scope, bool allow_op_delay) {
new (&self) ParallelExecutor(num_threads, use_event, places,
params, startup_program, main_program,
- loss_var_name, scope);
+ loss_var_name, scope, allow_op_delay);
})
.def("run", &ParallelExecutor::Run);
diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py
index 3e78788f470556d2196b5104f69a0a3285543ec4..e15456bfc0835066e3c899aea7e2cf642b4797d8 100644
--- a/python/paddle/fluid/framework.py
+++ b/python/paddle/fluid/framework.py
@@ -847,6 +847,11 @@ class Block(object):
if not self.has_var(var.name()):
self.create_var(name=var.name(), desc=var, type=var.type())
+ # sync variables removed from c++ end
+ for var in self.vars.keys():
+ if not self.desc.find_var(var):
+ self.vars.pop(var)
+
# sync operators from cpp
ops_in_cpp = []
for op_idx in range(0, self.desc.op_size()):
@@ -881,6 +886,19 @@ class Block(object):
op = Operator(self, op_desc)
self.ops.append(op)
+ # sync ops removed from c++ end
+ if end_index != -1 and end_index < len(self.ops):
+ ops_in_cpp_index = 0
+ ops_in_python_index = 0
+ while ops_in_python_index < len(
+ self.ops) and ops_in_cpp_index < len(ops_in_cpp):
+ if self.ops[ops_in_python_index].desc != ops_in_cpp[
+ ops_in_cpp_index]:
+ del self.ops[ops_in_python_index]
+ else:
+ ops_in_cpp_index += 1
+ ops_in_python_index += 1
+
assert len(self.ops) == len(ops_in_cpp)
for index in range(len(self.ops)):
assert self.ops[index].desc == ops_in_cpp[index]
diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py
index 5e0588fa73241a8752e1b3195a123820165f070d..a2c830b3c943b114f3024f23f73f78bf87e1da34 100644
--- a/python/paddle/fluid/parallel_executor.py
+++ b/python/paddle/fluid/parallel_executor.py
@@ -21,7 +21,11 @@ __all__ = ['ParallelExecutor']
class ParallelExecutor(object):
- def __init__(self, loss_name, use_cuda, num_threads=None):
+ def __init__(self,
+ loss_name,
+ use_cuda,
+ num_threads=None,
+ allow_op_delay=False):
places = []
if use_cuda:
for i in xrange(core.get_cuda_device_count()):
@@ -35,7 +39,12 @@ class ParallelExecutor(object):
places.append(p)
if num_threads is None:
- num_threads = min(len(places) * 2, multiprocessing.cpu_count())
+ if use_cuda:
+ # Experiments on se-resnext shows that too many threads hurt
+ # performance. Worth tunning for other models in the future.
+ num_threads = len(places)
+ else:
+ min(len(places) * 2, multiprocessing.cpu_count())
startup = framework.default_startup_program()
main = framework.default_main_program()
@@ -52,7 +61,8 @@ class ParallelExecutor(object):
startup.desc,
main.desc,
loss_name,
- scope)
+ scope,
+ allow_op_delay)
self.scope = scope
def run(self, fetch_list):
diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt
index 0ad273c7161977e18f91f952fd3a9dc144bf73f0..1b2d29a47fd050e40f83443432f8194984c71214 100644
--- a/python/paddle/fluid/tests/unittests/CMakeLists.txt
+++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt
@@ -29,6 +29,7 @@ function(py_test_modules TARGET_NAME)
endfunction()
# test time consuming OPs in a separate process for expliot parallism
+list(REMOVE_ITEM TEST_OPS test_parallel_executor)
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_dyn_rnn)
list(REMOVE_ITEM TEST_OPS test_mul_op)
@@ -64,6 +65,7 @@ else()
endif(WITH_FAST_BUNDLE_TEST)
# tests with high overhead
+py_test_modules(test_parallel_executor MODULES test_parallel_executor)
py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=${WARPCTC_LIB_DIR})
py_test_modules(test_train_dyn_rnn MODULES test_dyn_rnn)
py_test_modules(test_mul_op MODULES test_mul_op)
diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py
index 95d0f9da47e97e94ff97eb3647ac5244d5409ca3..a79e4b3e183eaef06be27a724893799923e84ac1 100644
--- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py
+++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py
@@ -135,18 +135,18 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
-def SE_ResNeXt152(batch_size=4):
+def SE_ResNeXt152Small(batch_size=2):
img = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant(
shape=[batch_size, 1], dtype='int64', value=0.0)
conv = conv_bn_layer(
- input=img, num_filters=64, filter_size=3, stride=2, act='relu')
+ input=img, num_filters=16, filter_size=3, stride=2, act='relu')
conv = conv_bn_layer(
- input=conv, num_filters=64, filter_size=3, stride=1, act='relu')
+ input=conv, num_filters=16, filter_size=3, stride=1, act='relu')
conv = conv_bn_layer(
- input=conv, num_filters=128, filter_size=3, stride=1, act='relu')
+ input=conv, num_filters=16, filter_size=3, stride=1, act='relu')
conv = fluid.layers.pool2d(
input=conv, pool_size=3, pool_stride=2, pool_padding=1, pool_type='max')
@@ -184,7 +184,8 @@ class TestParallelExecutorBase(unittest.TestCase):
method,
memory_opt=True,
iter=10,
- batch_size=None):
+ batch_size=None,
+ allow_op_delay=False):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
@@ -194,7 +195,10 @@ class TestParallelExecutorBase(unittest.TestCase):
if memory_opt:
fluid.memory_optimize(main)
- exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True)
+ exe = fluid.ParallelExecutor(
+ loss_name=loss.name,
+ use_cuda=True,
+ allow_op_delay=allow_op_delay)
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
@@ -222,7 +226,7 @@ class TestMNIST(TestParallelExecutorBase):
def setUpClass(cls):
# Convert mnist to recordio file
with fluid.program_guard(fluid.Program(), fluid.Program()):
- reader = paddle.batch(mnist.train(), batch_size=32)
+ reader = paddle.batch(mnist.train(), batch_size=4)
feeder = fluid.DataFeeder(
feed_list=[ # order is image and label
fluid.layers.data(
@@ -236,9 +240,11 @@ class TestMNIST(TestParallelExecutorBase):
def test_simple_fc(self):
self.check_network_convergence(simple_fc_net)
+ self.check_network_convergence(simple_fc_net, allow_op_delay=True)
def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm)
+ self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True)
class TestResnet(TestParallelExecutorBase):
@@ -262,10 +268,10 @@ class TestResnet(TestParallelExecutorBase):
def test_resnet(self):
import functools
- batch_size = 4
+ batch_size = 2
self.check_network_convergence(
functools.partial(
- SE_ResNeXt152, batch_size=batch_size),
+ SE_ResNeXt152Small, batch_size=batch_size),
iter=20,
batch_size=batch_size)
diff --git a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
index da85786d0c085a4e97d9ac272feed251296ad52d..e4cf4a8bce8a53c0348130716dc18c61ac9a5913 100644
--- a/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
+++ b/python/paddle/fluid/tests/unittests/test_protobuf_descs.py
@@ -14,6 +14,7 @@
import unittest
import paddle.fluid.core as core
+from paddle.fluid.framework import Program
class TestOpDesc(unittest.TestCase):
@@ -187,32 +188,46 @@ class TestBlockDesc(unittest.TestCase):
self.assertEqual(all_ops, [op0, op1, op2])
def test_remove_op(self):
- prog = core.ProgramDesc()
+ program = Program()
+ prog = program.desc
self.assertIsNotNone(prog)
block = prog.block(0)
self.assertIsNotNone(block)
+
+ op0 = block.append_op()
op1 = block.append_op()
op2 = block.append_op()
+ op0.set_type("test")
+ op1.set_type("test")
+ op2.set_type("test")
+
+ var0 = block.var("var0")
var1 = block.var("var1")
var2 = block.var("var2")
var3 = block.var("var3")
var4 = block.var("var4")
var5 = block.var("var5")
+
+ op0.set_input("X", ["var0"])
+ op0.set_output("Y", ["var0"])
op1.set_input("X", ["var1", "var2"])
op1.set_output("Y", ["var3", "var4"])
op2.set_input("X", ["var1"])
op2.set_output("Y", ["var4", "var5"])
+ program.sync_with_cpp()
+
# remove op1, its input var2 and output var3 will be removed at the same time,
# but its input var1 and output var4 will not be removed since they are used for op2.
- block.remove_op(0, 1)
+ block.remove_op(1, 2)
+ program.sync_with_cpp()
all_ops = []
for idx in xrange(0, block.op_size()):
all_ops.append(block.op(idx))
- self.assertEqual(all_ops, [op2])
+ self.assertEqual(all_ops, [op0, op2])
all_vars = block.all_vars()
- self.assertEqual(set(all_vars), {var1, var4, var5})
+ self.assertEqual(set(all_vars), {var0, var1, var4, var5})
if __name__ == '__main__':