提交 b03fa889 编写于 作者: T typhoonzero

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into fix_test_sendrecv_portbind

# Recurrent Group Tutorial # Recurrent Group Tutorial
TBD ## Overview
Sequential data is common in natural language processing.
A sentence is a sequence of words and many sentences form a paragraph further. Therefore, a paragraph can be viewed as a nested sequence with two level, where each element of the sequence is another sequence. That is to say, sequential data could be recursive. An example of two-level recursive sequential data is that an article is composed of a sequence of sentences, and each sentence a sequence of words.
PaddlePaddle and PaddlePaddle v2 support two-level recursive sequential data. The two-level sequence is a very flexible data, which helps us to better describe more complex language data such as discribing paragraphs and several rounds of dialogues. Based on two-level sequence input, we can design and build a flexible, hierarchical RNN model that encodes input data from the word and sentence level. For the support of arbitrary levels, please refer to PaddlePaddle Fluid.
In PaddlePaddle, `recurrent_group` is an arbitrarily complex RNN unit. The user only needs to define the calculation that the RNN will complete in one time step. PaddlePaddle is responsible for the propagation of information and error in time series.
Furthermore, `recurrent_group` can also be extended to handle two-level sequence. By defining two nested `recurrent_group` operations at the clause level and the word level respectively, a hierarchical and complex RNN is finally achieved.
Currently, in the PaddlePaddle, there are `recurrent_group` and some Layers that can process bidirectional sequences. For details, refer to the document: <a href = "hierarchical_layer_en.html">Layers for supporting double-layer sequences as input.</a>
## Related Concepts
### Basic Principle
`recurrent_group` is an arbitrarily complex RNN unit supported by PaddlePaddle. The user only needs to focus on the calculations that the RNN is designed to complete within a single time step. The PaddlePaddle is responsible for completing the propagation of information and gradients over time.
In PaddlePaddle, a simple call to `recurrent_group` is as follows:
``` python
recurrent_group(step, input, reverse)
```
- step: A callable function that defines the calculations completed by the RNN unit within a time step
- input: The input must be a single-layer sequence or a double-layer sequence
- reverse: Whether to process the input sequence in reverse order
The core of using `recurrent_group` is to design the logic of the step function. The step function can be freely combined with various layers supported by PaddlePaddle to complete arbitrary arithmetic logic. The input of `recurrent_group` (input) becomes the input of the step function. Since the step function only focuses on the calculation within one time step of RNN, here `recurrent_group` completes the splitting of the original input data for us.
### Input
The input sequence processed by `recurrent_group` is mainly divided into the following three types:
- **Input Data**: When putting a two-level sequence into `recurrent_group`, it will be disassembled into a single-level sequence. When putting a single-level sequence into `recurrent_group`, it will be disassembled into a non-sequence and then passed to the step function. This process is completely transparent to the user. There are two possible types: 1) User input via data_layer; 2) Output from other layers.
- **Read-only Memory Input**: `StaticInput` defines a read-only Memory. The input specified by `StaticInput` will not be disassembled by `recurrent_group`, and each time step of the `recurrent_group` loop will always be able to reference all inputs. It may be a non-sequence or a single-layer sequence.
- **Input of Sequence Generation Task**: `GeneratedInput` is only used to specify input data in a sequence generation task.
### Input Example
Sequence generation tasks mostly follow the encoder-decoer architecture. The encoder and decoder can be arbitrary neural network units capable of processing sequences and RNN is the most popular choice.
Given the encoder output and the current word, the decoder predicts the next most likely word each time. In this structure, the decoder accepts two inputs:
- Target sequence to be generated: a input of the decoder and the basis of the decoder loop. `recurrent_group` will disassemble this input type.
- Encoder output, an non-sequencce or single-sequence: a unbounded memory. Each time step in the decoder loop will reference the entire result and should not be disassembled. This type of input must be specified via `StaticInput`. For more discussion on Unbounded Memory, please refer to the paper [Neural Turning Machine](https://arxiv.org/abs/1410.5401).
In a sequence generation task, the decoder RNN always refers to the word vector of the word predicted at the previous moment as the current time input. `GeneratedInput` will automate this process.
### Output
The `step` function must return the output of one or more Layers. The output of this Layer will be the final output of the entire `recurrent_group`. In the output process, `recurrent_group` will concatenate the output of each time step, which is also transparent to the user.
### Memory
Memory can only be defined and used in `recurrent_group`. Memory cannot exist independently and must point to a layer defined by PaddlePaddle. Memory is referenced to get a momentary output from this layer, so memory can be interpreted as a delay operation.
The user can explicitly specify the output of a layer to initialize the memory. When not specified, memory is initialized to 0 by default.
## Sequence-level RNN Introduction
`recurrent_group` helps us to split the input sequence, merge the output, and loop through the sequence of computational logic.
Using this feature, the two nested `recurrent_group` can handle the nested two-level sequences, implementing sequence-level RNN structures at both the word and sentence levels.
- Word-level RNN: each state corresponds to a word.
- Sequence-level RNN: a sequence-layer RNN consists of multiple word-layer RNNs. Each word-layer RNN (ie, each state of a sequence-layer RNN) has a subsequence.
For convenience of description, the following takes the NLP task as an example. A paragraph containing a subsequence is defined as a two-level sequence, and a sentence containing a word is defined as a single-layer sequence. Then, the zero-level sequence is a word.
## Usage of Sequence-level RNN
### Usage of Training Process
Using `recurrent_group` requires the following conventions:
- **Single-input Single-output**: Both input and output are single layer sequences.
- If there are multiple inputs, the number of words in different input sequences must be exactly equal.
- A single-layer sequence is output, and the number of words in the output sequence is the same as the input sequence.
- memory: define memory to point to a layer in the step function, get a moment output from this layer by referencing memory to form a recurrent connection. The is_seq parameter of memory must be false. If memory is not defined, the operations within each time step are independent.
- boot_layer: the initial state of memory, set 0 by default. is_seq in memory must be false.
- **Double-input Double-output**: Both input and output are two-level sequence.
- If there are multiple input sequences, the number of subsequence contained in different inputs must be strictly equal, but the number of words in the subsequence may not be equal.
- output a two-level sequence. The number of subsequence and the number of words are the same as the specified input sequence and the first input is default.
- memory: defining memory in the step function, pointing to a layer, by referring to the memory to get the output of this layer at a time, forming a recurrent connection. The memory defined in the outer `recurrent_group` step function can record the state of the previous subsequence, either as a single-level sequence (only as read-only memory) or as a word. If memory is not defined, the operations between subsequence are independent.
- boot_layer: the initial state of memory. It is either a single-level sequence (only as read-only memory) or a vector. The default is not set, that is, the initial state is 0.
- **Double-input Single-output**: not support for now, and output the error with "In hierachical RNN, all out links should be from sequences now".
### Usage of Generation Process
Using `beam_search` need follow those conventions:
- Word-level RNN: generate the next word from a word.
- Sequence-level RNN: the single-layer RNN generated subsequence is concatenated into a new double-layer sequence. Semantically, there is no case where a subsequence generates the next subseq directly.
...@@ -76,7 +76,7 @@ void NCCLAllReduceOpHandle::RunImpl() { ...@@ -76,7 +76,7 @@ void NCCLAllReduceOpHandle::RunImpl() {
} }
} }
std::string NCCLAllReduceOpHandle::Name() const { return "NCCL AllReduce"; } std::string NCCLAllReduceOpHandle::Name() const { return "nccl_all_reduce"; }
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -14,6 +14,9 @@ ...@@ -14,6 +14,9 @@
#pragma once #pragma once
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
...@@ -34,6 +37,10 @@ struct NCCLAllReduceOpHandle : public OpHandleBase { ...@@ -34,6 +37,10 @@ struct NCCLAllReduceOpHandle : public OpHandleBase {
std::string Name() const override; std::string Name() const override;
// Delay and buffer nccl_all_reduce together can significantly increase
// performance. Disable this feature by returning false.
bool IsMultiDeviceTransfer() override { return true; };
protected: protected:
void RunImpl() override; void RunImpl() override;
}; };
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
// limitations under the License. // limitations under the License.
#pragma once #pragma once
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/var_handle.h" #include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
...@@ -53,6 +55,10 @@ class OpHandleBase { ...@@ -53,6 +55,10 @@ class OpHandleBase {
void AddOutput(VarHandleBase *out); void AddOutput(VarHandleBase *out);
// If the Op involves data transfer of multiple devices that
// will likely block other computations.
virtual bool IsMultiDeviceTransfer() { return false; }
protected: protected:
virtual void RunImpl() = 0; virtual void RunImpl() = 0;
}; };
......
...@@ -23,22 +23,36 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( ...@@ -23,22 +23,36 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
size_t num_threads, bool use_event, size_t num_threads, bool use_event,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
std::unique_ptr<SSAGraph> &&graph) std::unique_ptr<SSAGraph> &&graph, bool allow_op_delay)
: SSAGraphExecutor(std::move(graph)), : SSAGraphExecutor(std::move(graph)),
pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr), pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr),
local_scopes_(local_scopes), local_scopes_(local_scopes),
places_(places), places_(places),
fetch_ctxs_(places), fetch_ctxs_(places),
use_event_(use_event) {} use_event_(use_event),
running_ops_(0),
allow_op_delay_(allow_op_delay) {}
void ThreadedSSAGraphExecutor::RunDelayedOps(
const std::unordered_set<OpHandleBase *> &delayed_ops) {
for (auto op : delayed_ops) {
op->Run(use_event_);
}
}
FeedFetchList ThreadedSSAGraphExecutor::Run( FeedFetchList ThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) { const std::vector<std::string> &fetch_tensors) {
std::unordered_map<OpHandleBase *, size_t> pending_ops; std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars; std::unordered_set<VarHandleBase *> pending_vars;
BlockingQueue<VarHandleBase *> ready_vars; BlockingQueue<VarHandleBase *> ready_vars;
std::unordered_set<OpHandleBase *> ready_ops; std::unordered_set<OpHandleBase *> ready_ops;
// For ops (e.g. nccl_all_reduce) that need to coordinate multiple
// streams from multiple GPUs, it's faster to buffer them and schedule
// together since we currently cannot overlap computation and memcpy streams.
// Should revisit it if overlapping is available.
std::unordered_set<OpHandleBase *> delayed_ops;
std::unordered_set<OpHandleBase *> blocked_by_delayed_ops;
std::unordered_set<VarHandleBase *> delayed_vars;
auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) { auto InsertPendingVar = [&pending_vars, &ready_vars](VarHandleBase &var) {
pending_vars.insert(&var); pending_vars.insert(&var);
...@@ -106,7 +120,14 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -106,7 +120,14 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto run_all_ready_ops = [&] { auto run_all_ready_ops = [&] {
for (auto *op : ready_ops) { for (auto *op : ready_ops) {
RunOp(ready_vars, op); if (op->IsMultiDeviceTransfer() && allow_op_delay_) {
delayed_ops.insert(op);
delayed_vars.insert(op->outputs_.begin(), op->outputs_.end());
ready_vars.Extend(op->outputs_);
continue;
}
running_ops_++;
RunOp(&ready_vars, op);
} }
ready_ops.clear(); ready_ops.clear();
}; };
...@@ -118,13 +139,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -118,13 +139,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
} }
// Step 3. Execution // Step 3. Execution
while (!pending_vars.empty()) { while (!pending_vars.empty() || !ready_ops.empty() || !delayed_ops.empty()) {
// 1. Run All Ready ops // 1. Run All Ready ops
run_all_ready_ops(); run_all_ready_ops();
// 2. Find ready variable // 2. Find ready variable
bool timeout; bool timeout;
auto cur_ready_vars = ready_vars.PopAll(1000, &timeout); auto cur_ready_vars = ready_vars.PopAll(1, &timeout);
if (timeout) { if (timeout) {
if (exception_) { if (exception_) {
...@@ -141,13 +162,29 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -141,13 +162,29 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto &deps = pending_ops[op]; auto &deps = pending_ops[op];
--deps; --deps;
if (deps == 0) { if (deps == 0) {
ready_ops.insert(op); if (delayed_vars.find(ready_var) != delayed_vars.end()) {
blocked_by_delayed_ops.insert(op);
} else {
ready_ops.insert(op);
}
} }
} }
} }
// When there are no other ops to schedule, schedule buffered delayed
// ops and unblock other ops.
if (ready_ops.empty() && !delayed_ops.empty() && running_ops_ == 0) {
RunDelayedOps(delayed_ops);
delayed_ops.clear();
for (auto *op : blocked_by_delayed_ops) {
ready_ops.insert(op);
}
blocked_by_delayed_ops.clear();
}
// Keep loop until all vars are ready. // Keep loop until all vars are ready.
} }
PADDLE_ENFORCE(ready_ops.empty());
PADDLE_ENFORCE(delayed_ops.empty());
PADDLE_ENFORCE(blocked_by_delayed_ops.empty());
++computation_count_; ++computation_count_;
auto sync_computation = [&] { auto sync_computation = [&] {
...@@ -182,12 +219,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -182,12 +219,13 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
} }
void ThreadedSSAGraphExecutor::RunOp( void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> &ready_var_q, details::OpHandleBase *op) { BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [&ready_var_q, op, this] { auto op_run = [ready_var_q, op, this] {
try { try {
VLOG(10) << op->Name() << " : " << op->DebugString(); VLOG(10) << op->Name() << " : " << op->DebugString();
op->Run(use_event_); op->Run(use_event_);
ready_var_q.Extend(op->outputs_); running_ops_--;
ready_var_q->Extend(op->outputs_);
} catch (platform::EnforceNotMet ex) { } catch (platform::EnforceNotMet ex) {
exception_.reset(new platform::EnforceNotMet(ex)); exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) { } catch (...) {
......
...@@ -14,7 +14,12 @@ ...@@ -14,7 +14,12 @@
#pragma once #pragma once
#include <chrono> #include <deque>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include <functional> #include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party #include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/details/ssa_graph_executor.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h"
...@@ -70,7 +75,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -70,7 +75,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
ThreadedSSAGraphExecutor(size_t num_threads, bool use_event, ThreadedSSAGraphExecutor(size_t num_threads, bool use_event,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
std::unique_ptr<SSAGraph> &&graph); std::unique_ptr<SSAGraph> &&graph,
bool allow_op_delay);
// Run a SSAGraph by a thread pool // Run a SSAGraph by a thread pool
// Use topological sort algorithm // Use topological sort algorithm
...@@ -79,9 +85,11 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -79,9 +85,11 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
~ThreadedSSAGraphExecutor() {} ~ThreadedSSAGraphExecutor() {}
private: private:
void RunOp(BlockingQueue<VarHandleBase *> &ready_var_q, void RunOp(BlockingQueue<VarHandleBase *> *ready_var_q,
details::OpHandleBase *op); details::OpHandleBase *op);
void RunDelayedOps(const std::unordered_set<OpHandleBase *> &delayed_ops);
private: private:
std::unique_ptr<::ThreadPool> pool_; std::unique_ptr<::ThreadPool> pool_;
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
...@@ -89,6 +97,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -89,6 +97,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
platform::DeviceContextPool fetch_ctxs_; platform::DeviceContextPool fetch_ctxs_;
const bool use_event_; const bool use_event_;
std::unique_ptr<platform::EnforceNotMet> exception_; std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;
bool allow_op_delay_;
size_t computation_count_{0}; size_t computation_count_{0};
size_t max_async_computation{100}; size_t max_async_computation{100};
......
...@@ -279,6 +279,21 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare( ...@@ -279,6 +279,21 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
return std::unique_ptr<ExecutorPrepareContext>(ctx); return std::unique_ptr<ExecutorPrepareContext>(ctx);
} }
std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids) {
std::vector<std::shared_ptr<ExecutorPrepareContext>> result;
for (auto& bid : block_ids) {
auto* ctx = new ExecutorPrepareContext(program, bid);
PADDLE_ENFORCE_LT(static_cast<size_t>(bid), program.Size());
auto& block = program.Block(bid);
for (auto& op_desc : block.AllOps()) {
ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
}
result.push_back(std::shared_ptr<ExecutorPrepareContext>(ctx));
}
return result;
}
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) { bool create_local_scope, bool create_vars) {
auto& block = ctx->prog_.Block(ctx->block_id_); auto& block = ctx->prog_.Block(ctx->block_id_);
......
...@@ -61,6 +61,9 @@ class Executor { ...@@ -61,6 +61,9 @@ class Executor {
static std::unique_ptr<ExecutorPrepareContext> Prepare( static std::unique_ptr<ExecutorPrepareContext> Prepare(
const ProgramDesc& program, int block_id); const ProgramDesc& program, int block_id);
static std::vector<std::shared_ptr<ExecutorPrepareContext>> Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids);
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true, bool create_local_scope = true,
bool create_vars = true); bool create_vars = true);
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/framework/parallel_executor.h" #include "paddle/fluid/framework/parallel_executor.h"
#include "paddle/fluid/platform/profiler.h"
#include <string> #include <string>
#include <vector> #include <vector>
...@@ -47,7 +48,7 @@ ParallelExecutor::ParallelExecutor( ...@@ -47,7 +48,7 @@ ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &params, const std::unordered_set<std::string> &params,
const ProgramDesc &startup_program, const ProgramDesc &main_program, const ProgramDesc &startup_program, const ProgramDesc &main_program,
const std::string &loss_var_name, Scope *scope) const std::string &loss_var_name, Scope *scope, bool allow_op_delay)
: member_(new ParallelExecutorPrivate(places)) { : member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope; member_->global_scope_ = scope;
...@@ -82,8 +83,8 @@ ParallelExecutor::ParallelExecutor( ...@@ -82,8 +83,8 @@ ParallelExecutor::ParallelExecutor(
auto graph = builder.Build(main_program); auto graph = builder.Build(main_program);
member_->executor_.reset(new details::ThreadedSSAGraphExecutor( member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
num_threads, use_event, member_->local_scopes_, places, num_threads, use_event, member_->local_scopes_, places, std::move(graph),
std::move(graph))); allow_op_delay));
// Step 3. Create vars in each scope; // Step 3. Create vars in each scope;
for (auto *scope : member_->local_scopes_) { for (auto *scope : member_->local_scopes_) {
...@@ -151,6 +152,7 @@ void ParallelExecutor::BCastParamsToGPUs( ...@@ -151,6 +152,7 @@ void ParallelExecutor::BCastParamsToGPUs(
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) { const std::string &fetched_var_name) {
platform::RecordBlock b(0);
auto fetch_data = member_->executor_->Run(fetch_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() = *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data; fetch_data;
......
...@@ -14,8 +14,9 @@ limitations under the License. */ ...@@ -14,8 +14,9 @@ limitations under the License. */
#pragma once #pragma once
#include <future> #include <string>
#include <unordered_set> #include <unordered_set>
#include <vector>
#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
...@@ -37,7 +38,8 @@ class ParallelExecutor { ...@@ -37,7 +38,8 @@ class ParallelExecutor {
const std::unordered_set<std::string>& params, const std::unordered_set<std::string>& params,
const ProgramDesc& startup_program, const ProgramDesc& startup_program,
const ProgramDesc& main_program, const ProgramDesc& main_program,
const std::string& loss_var_name, Scope* scope); const std::string& loss_var_name, Scope* scope,
bool allow_op_delay);
void Run(const std::vector<std::string>& fetch_tensors, void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name = "fetched_var"); const std::string& fetched_var_name = "fetched_var");
......
...@@ -13,7 +13,11 @@ ...@@ -13,7 +13,11 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/operators/detail/variable_response.h" #include "paddle/fluid/operators/detail/variable_response.h"
#include <string.h>
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/operators/detail/send_recv.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h"
...@@ -151,7 +155,7 @@ bool VariableResponse::CopySelectRowsTensorData( ...@@ -151,7 +155,7 @@ bool VariableResponse::CopySelectRowsTensorData(
auto* tensor = slr->mutable_value(); auto* tensor = slr->mutable_value();
tensor->Resize(dims); tensor->Resize(dims);
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
tensor->numel(), static_cast<size_t>(tensor->numel()),
length / framework::SizeOfType( length / framework::SizeOfType(
paddle::operators::detail::ToTypeIndex(meta_.data_type()))); paddle::operators::detail::ToTypeIndex(meta_.data_type())));
void* tensor_data = tensor->mutable_data( void* tensor_data = tensor->mutable_data(
......
...@@ -39,20 +39,23 @@ static void CreateTensorFromMessageType(framework::Variable *var, ...@@ -39,20 +39,23 @@ static void CreateTensorFromMessageType(framework::Variable *var,
} }
} }
static void ParallelExecuteBlocks(const std::vector<size_t> &parallel_blkids, static void ParallelExecuteBlocks(
framework::Executor *executor, const std::vector<size_t> &parallel_blkids, framework::Executor *executor,
framework::ProgramDesc *program, const std::vector<std::shared_ptr<framework::ExecutorPrepareContext>>
framework::Scope *scope) { &prepared,
framework::ProgramDesc *program, framework::Scope *scope) {
std::vector<std::future<void>> fs; std::vector<std::future<void>> fs;
for (size_t idx : parallel_blkids) { for (size_t idx : parallel_blkids) {
fs.push_back(framework::Async([&executor, &program, &scope, idx]() { fs.push_back(
int run_block = idx; // thread local framework::Async([&executor, &prepared, &program, &scope, idx]() {
try { int run_block = idx; // thread local
executor->Run(*program, scope, run_block, false, false); try {
} catch (std::exception &e) { executor->RunPreparedContext(prepared[run_block].get(), scope,
LOG(ERROR) << "run sub program error " << e.what(); false, false);
} } catch (std::exception &e) {
})); LOG(ERROR) << "run sub program error " << e.what();
}
}));
} }
for (size_t i = 0; i < fs.size(); ++i) fs[i].wait(); for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
} }
...@@ -77,7 +80,6 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -77,7 +80,6 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place); auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope(); framework::Scope &recv_scope = scope.NewScope();
LOG(INFO) << "created recv scope: " << &recv_scope;
if (!rpc_service_) { if (!rpc_service_) {
std::string endpoint = Attr<std::string>("endpoint"); std::string endpoint = Attr<std::string>("endpoint");
...@@ -93,6 +95,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -93,6 +95,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
"server program should have at least 2 blocks"); "server program should have at least 2 blocks");
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
block_list.push_back(blkid);
}
auto prepared = executor.Prepare(*program, block_list);
// Insert placeholder for block0 which holds current op itself.
prepared.insert(prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
rpc_service_->SetScope(&recv_scope); rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx); rpc_service_->SetDevCtx(&dev_ctx);
...@@ -157,16 +167,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -157,16 +167,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
double ts = detail::GetTimestamp(); double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) { for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (program->Block(blkid).Parent() != last_parent_blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) {
for (size_t idx : parallel_blkids) VLOG(3) << idx; ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
ParallelExecuteBlocks(parallel_blkids, &executor, program, &recv_scope); &recv_scope);
parallel_blkids.clear(); parallel_blkids.clear();
last_parent_blkid = program->Block(blkid).Parent(); last_parent_blkid = program->Block(blkid).Parent();
} }
parallel_blkids.push_back(blkid); parallel_blkids.push_back(blkid);
} }
ParallelExecuteBlocks(parallel_blkids, &executor, program, &recv_scope); ParallelExecuteBlocks(parallel_blkids, &executor, prepared, program,
&recv_scope);
VLOG(3) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)"; VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
// Reset the received sparse variables, the sum operator would not // Reset the received sparse variables, the sum operator would not
// sum the input sparse variables which rows is empty at the next // sum the input sparse variables which rows is empty at the next
......
...@@ -30,19 +30,16 @@ class SplitIdsOpKernel : public framework::OpKernel<T> { ...@@ -30,19 +30,16 @@ class SplitIdsOpKernel : public framework::OpKernel<T> {
PADDLE_THROW("SplitIds do not support GPU kernel"); PADDLE_THROW("SplitIds do not support GPU kernel");
} }
const auto* ids_t = ctx.Input<framework::LoDTensor>("Ids"); auto& ids_dims = ctx.Input<framework::LoDTensor>("Ids")->dims();
auto& ids_dims = ids_t->dims(); const T* ids = ctx.Input<framework::LoDTensor>("Ids")->data<T>();
auto outs = ctx.MultiOutput<framework::LoDTensor>("Out"); auto outs = ctx.MultiOutput<framework::LoDTensor>("Out");
const T* ids = ids_t->data<T>();
const size_t shard_num = outs.size(); const size_t shard_num = outs.size();
std::vector<std::vector<T>> out_ids; std::vector<std::vector<T>> out_ids;
out_ids.resize(outs.size()); out_ids.resize(outs.size());
// split id by their shard_num. // split id by their shard_num.
for (size_t i = 0; i < ids_dims[0]; ++i) { for (int i = 0; i < ids_dims[0]; ++i) {
T id = ids[i]; T id = ids[i];
size_t shard_id = static_cast<size_t>(id) % shard_num; size_t shard_id = static_cast<size_t>(id) % shard_num;
out_ids[shard_id].push_back(id); out_ids[shard_id].push_back(id);
......
...@@ -504,10 +504,10 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -504,10 +504,10 @@ All parameter, weight, gradient are variables in Paddle.
const std::unordered_set<std::string> &params, const std::unordered_set<std::string> &params,
const ProgramDesc &startup_program, const ProgramDesc &startup_program,
const ProgramDesc &main_program, const std::string &loss_var_name, const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope) { Scope *scope, bool allow_op_delay) {
new (&self) ParallelExecutor(num_threads, use_event, places, new (&self) ParallelExecutor(num_threads, use_event, places,
params, startup_program, main_program, params, startup_program, main_program,
loss_var_name, scope); loss_var_name, scope, allow_op_delay);
}) })
.def("run", &ParallelExecutor::Run); .def("run", &ParallelExecutor::Run);
......
...@@ -847,6 +847,11 @@ class Block(object): ...@@ -847,6 +847,11 @@ class Block(object):
if not self.has_var(var.name()): if not self.has_var(var.name()):
self.create_var(name=var.name(), desc=var, type=var.type()) self.create_var(name=var.name(), desc=var, type=var.type())
# sync variables removed from c++ end
for var in self.vars.keys():
if not self.desc.find_var(var):
self.vars.pop(var)
# sync operators from cpp # sync operators from cpp
ops_in_cpp = [] ops_in_cpp = []
for op_idx in range(0, self.desc.op_size()): for op_idx in range(0, self.desc.op_size()):
...@@ -881,6 +886,19 @@ class Block(object): ...@@ -881,6 +886,19 @@ class Block(object):
op = Operator(self, op_desc) op = Operator(self, op_desc)
self.ops.append(op) self.ops.append(op)
# sync ops removed from c++ end
if end_index != -1 and end_index < len(self.ops):
ops_in_cpp_index = 0
ops_in_python_index = 0
while ops_in_python_index < len(
self.ops) and ops_in_cpp_index < len(ops_in_cpp):
if self.ops[ops_in_python_index].desc != ops_in_cpp[
ops_in_cpp_index]:
del self.ops[ops_in_python_index]
else:
ops_in_cpp_index += 1
ops_in_python_index += 1
assert len(self.ops) == len(ops_in_cpp) assert len(self.ops) == len(ops_in_cpp)
for index in range(len(self.ops)): for index in range(len(self.ops)):
assert self.ops[index].desc == ops_in_cpp[index] assert self.ops[index].desc == ops_in_cpp[index]
......
...@@ -21,7 +21,11 @@ __all__ = ['ParallelExecutor'] ...@@ -21,7 +21,11 @@ __all__ = ['ParallelExecutor']
class ParallelExecutor(object): class ParallelExecutor(object):
def __init__(self, loss_name, use_cuda, num_threads=None): def __init__(self,
loss_name,
use_cuda,
num_threads=None,
allow_op_delay=False):
places = [] places = []
if use_cuda: if use_cuda:
for i in xrange(core.get_cuda_device_count()): for i in xrange(core.get_cuda_device_count()):
...@@ -35,7 +39,12 @@ class ParallelExecutor(object): ...@@ -35,7 +39,12 @@ class ParallelExecutor(object):
places.append(p) places.append(p)
if num_threads is None: if num_threads is None:
num_threads = min(len(places) * 2, multiprocessing.cpu_count()) if use_cuda:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
num_threads = len(places)
else:
min(len(places) * 2, multiprocessing.cpu_count())
startup = framework.default_startup_program() startup = framework.default_startup_program()
main = framework.default_main_program() main = framework.default_main_program()
...@@ -52,7 +61,8 @@ class ParallelExecutor(object): ...@@ -52,7 +61,8 @@ class ParallelExecutor(object):
startup.desc, startup.desc,
main.desc, main.desc,
loss_name, loss_name,
scope) scope,
allow_op_delay)
self.scope = scope self.scope = scope
def run(self, fetch_list): def run(self, fetch_list):
......
...@@ -29,6 +29,7 @@ function(py_test_modules TARGET_NAME) ...@@ -29,6 +29,7 @@ function(py_test_modules TARGET_NAME)
endfunction() endfunction()
# test time consuming OPs in a separate process for expliot parallism # test time consuming OPs in a separate process for expliot parallism
list(REMOVE_ITEM TEST_OPS test_parallel_executor)
list(REMOVE_ITEM TEST_OPS test_warpctc_op) list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_dyn_rnn) list(REMOVE_ITEM TEST_OPS test_dyn_rnn)
list(REMOVE_ITEM TEST_OPS test_mul_op) list(REMOVE_ITEM TEST_OPS test_mul_op)
...@@ -64,6 +65,7 @@ else() ...@@ -64,6 +65,7 @@ else()
endif(WITH_FAST_BUNDLE_TEST) endif(WITH_FAST_BUNDLE_TEST)
# tests with high overhead # tests with high overhead
py_test_modules(test_parallel_executor MODULES test_parallel_executor)
py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=${WARPCTC_LIB_DIR}) py_test_modules(test_warpctc_op MODULES test_warpctc_op ENVS FLAGS_warpctc_dir=${WARPCTC_LIB_DIR})
py_test_modules(test_train_dyn_rnn MODULES test_dyn_rnn) py_test_modules(test_train_dyn_rnn MODULES test_dyn_rnn)
py_test_modules(test_mul_op MODULES test_mul_op) py_test_modules(test_mul_op MODULES test_mul_op)
......
...@@ -135,18 +135,18 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio): ...@@ -135,18 +135,18 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return fluid.layers.elementwise_add(x=short, y=scale, act='relu') return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
def SE_ResNeXt152(batch_size=4): def SE_ResNeXt152Small(batch_size=2):
img = fluid.layers.fill_constant( img = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0) shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant( label = fluid.layers.fill_constant(
shape=[batch_size, 1], dtype='int64', value=0.0) shape=[batch_size, 1], dtype='int64', value=0.0)
conv = conv_bn_layer( conv = conv_bn_layer(
input=img, num_filters=64, filter_size=3, stride=2, act='relu') input=img, num_filters=16, filter_size=3, stride=2, act='relu')
conv = conv_bn_layer( conv = conv_bn_layer(
input=conv, num_filters=64, filter_size=3, stride=1, act='relu') input=conv, num_filters=16, filter_size=3, stride=1, act='relu')
conv = conv_bn_layer( conv = conv_bn_layer(
input=conv, num_filters=128, filter_size=3, stride=1, act='relu') input=conv, num_filters=16, filter_size=3, stride=1, act='relu')
conv = fluid.layers.pool2d( conv = fluid.layers.pool2d(
input=conv, pool_size=3, pool_stride=2, pool_padding=1, pool_type='max') input=conv, pool_size=3, pool_stride=2, pool_padding=1, pool_type='max')
...@@ -184,7 +184,8 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -184,7 +184,8 @@ class TestParallelExecutorBase(unittest.TestCase):
method, method,
memory_opt=True, memory_opt=True,
iter=10, iter=10,
batch_size=None): batch_size=None,
allow_op_delay=False):
main = fluid.Program() main = fluid.Program()
startup = fluid.Program() startup = fluid.Program()
with fluid.program_guard(main, startup): with fluid.program_guard(main, startup):
...@@ -194,7 +195,10 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -194,7 +195,10 @@ class TestParallelExecutorBase(unittest.TestCase):
if memory_opt: if memory_opt:
fluid.memory_optimize(main) fluid.memory_optimize(main)
exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True) exe = fluid.ParallelExecutor(
loss_name=loss.name,
use_cuda=True,
allow_op_delay=allow_op_delay)
if batch_size is not None: if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count() batch_size *= fluid.core.get_cuda_device_count()
begin = time.time() begin = time.time()
...@@ -222,7 +226,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -222,7 +226,7 @@ class TestMNIST(TestParallelExecutorBase):
def setUpClass(cls): def setUpClass(cls):
# Convert mnist to recordio file # Convert mnist to recordio file
with fluid.program_guard(fluid.Program(), fluid.Program()): with fluid.program_guard(fluid.Program(), fluid.Program()):
reader = paddle.batch(mnist.train(), batch_size=32) reader = paddle.batch(mnist.train(), batch_size=4)
feeder = fluid.DataFeeder( feeder = fluid.DataFeeder(
feed_list=[ # order is image and label feed_list=[ # order is image and label
fluid.layers.data( fluid.layers.data(
...@@ -236,9 +240,11 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -236,9 +240,11 @@ class TestMNIST(TestParallelExecutorBase):
def test_simple_fc(self): def test_simple_fc(self):
self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True)
def test_batchnorm_fc(self): def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm) self.check_network_convergence(fc_with_batchnorm)
self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True)
class TestResnet(TestParallelExecutorBase): class TestResnet(TestParallelExecutorBase):
...@@ -262,10 +268,10 @@ class TestResnet(TestParallelExecutorBase): ...@@ -262,10 +268,10 @@ class TestResnet(TestParallelExecutorBase):
def test_resnet(self): def test_resnet(self):
import functools import functools
batch_size = 4 batch_size = 2
self.check_network_convergence( self.check_network_convergence(
functools.partial( functools.partial(
SE_ResNeXt152, batch_size=batch_size), SE_ResNeXt152Small, batch_size=batch_size),
iter=20, iter=20,
batch_size=batch_size) batch_size=batch_size)
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
import unittest import unittest
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid.framework import Program
class TestOpDesc(unittest.TestCase): class TestOpDesc(unittest.TestCase):
...@@ -187,32 +188,46 @@ class TestBlockDesc(unittest.TestCase): ...@@ -187,32 +188,46 @@ class TestBlockDesc(unittest.TestCase):
self.assertEqual(all_ops, [op0, op1, op2]) self.assertEqual(all_ops, [op0, op1, op2])
def test_remove_op(self): def test_remove_op(self):
prog = core.ProgramDesc() program = Program()
prog = program.desc
self.assertIsNotNone(prog) self.assertIsNotNone(prog)
block = prog.block(0) block = prog.block(0)
self.assertIsNotNone(block) self.assertIsNotNone(block)
op0 = block.append_op()
op1 = block.append_op() op1 = block.append_op()
op2 = block.append_op() op2 = block.append_op()
op0.set_type("test")
op1.set_type("test")
op2.set_type("test")
var0 = block.var("var0")
var1 = block.var("var1") var1 = block.var("var1")
var2 = block.var("var2") var2 = block.var("var2")
var3 = block.var("var3") var3 = block.var("var3")
var4 = block.var("var4") var4 = block.var("var4")
var5 = block.var("var5") var5 = block.var("var5")
op0.set_input("X", ["var0"])
op0.set_output("Y", ["var0"])
op1.set_input("X", ["var1", "var2"]) op1.set_input("X", ["var1", "var2"])
op1.set_output("Y", ["var3", "var4"]) op1.set_output("Y", ["var3", "var4"])
op2.set_input("X", ["var1"]) op2.set_input("X", ["var1"])
op2.set_output("Y", ["var4", "var5"]) op2.set_output("Y", ["var4", "var5"])
program.sync_with_cpp()
# remove op1, its input var2 and output var3 will be removed at the same time, # remove op1, its input var2 and output var3 will be removed at the same time,
# but its input var1 and output var4 will not be removed since they are used for op2. # but its input var1 and output var4 will not be removed since they are used for op2.
block.remove_op(0, 1) block.remove_op(1, 2)
program.sync_with_cpp()
all_ops = [] all_ops = []
for idx in xrange(0, block.op_size()): for idx in xrange(0, block.op_size()):
all_ops.append(block.op(idx)) all_ops.append(block.op(idx))
self.assertEqual(all_ops, [op2]) self.assertEqual(all_ops, [op0, op2])
all_vars = block.all_vars() all_vars = block.all_vars()
self.assertEqual(set(all_vars), {var1, var4, var5}) self.assertEqual(set(all_vars), {var0, var1, var4, var5})
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册