未验证 提交 894236a1 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #6730 from tonyyang-svail/parallel_do

[WIP]: feature/parallel_do
...@@ -427,7 +427,8 @@ std::vector<std::unique_ptr<OpDesc>> MakeBlockBackward( ...@@ -427,7 +427,8 @@ std::vector<std::unique_ptr<OpDesc>> MakeBlockBackward(
VLOG(5) << "Making backward " << (*it)->Type() << " op"; VLOG(5) << "Making backward " << (*it)->Type() << " op";
std::vector<std::unique_ptr<OpDesc>> op_grads; std::vector<std::unique_ptr<OpDesc>> op_grads;
if ((*it)->Type() == "recurrent" || (*it)->Type() == "while") { if ((*it)->Type() == "recurrent" || (*it)->Type() == "while" ||
(*it)->Type() == "parallel_do") {
int step_block_idx = (*it)->GetBlockAttr("sub_block"); int step_block_idx = (*it)->GetBlockAttr("sub_block");
BlockDesc* backward_block = CreateStepBlock(program_desc, no_grad_vars, BlockDesc* backward_block = CreateStepBlock(program_desc, no_grad_vars,
grad_to_var, step_block_idx); grad_to_var, step_block_idx);
......
...@@ -43,6 +43,22 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) { ...@@ -43,6 +43,22 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) {
return os; return os;
} }
std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
PADDLE_ENFORCE(platform::is_cpu_place(t.place()));
PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code());
os << "dim: " << t.dims() << "\n";
os << "lod: " << t.lod() << "\n";
// only print first ten elements
int64_t size = t.numel() < 10 ? t.numel() : 10;
for (int64_t i = 0; i < size; ++i) {
os << t.data<float>()[i] << " ";
}
return os;
}
LoD SliceLevels(const LoD &in, size_t level_begin, size_t level_end) { LoD SliceLevels(const LoD &in, size_t level_begin, size_t level_end) {
LoD new_lod; LoD new_lod;
new_lod.reserve(level_end - level_begin); new_lod.reserve(level_end - level_begin);
...@@ -244,5 +260,69 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, ...@@ -244,5 +260,69 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
DeserializeFromStream(is, static_cast<Tensor *>(tensor), dev_ctx); DeserializeFromStream(is, static_cast<Tensor *>(tensor), dev_ctx);
} }
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
const std::vector<platform::Place> places) const {
check_memory_size();
// PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty())
// , "Disable parallel lod for now");
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
"Batch size should be divided by places size");
std::vector<LoDTensor> lods;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
size_t begin = place_idx * dims()[0] / places.size();
size_t end = (place_idx + 1) * dims()[0] / places.size();
auto src = Slice(static_cast<int>(begin), static_cast<int>(end));
LoDTensor dst;
dst.Resize(src.dims());
auto &dst_place = places[place_idx];
auto dst_ptr = dst.mutable_data(dst_place, src.type());
// TODO(tonyyang-svail):
// change the following to framework::CopyFrom
auto src_place = src.place();
auto src_ptr = src.data<void>();
auto size = src.numel() * SizeOfType(src.type());
if (platform::is_cpu_place(src_place) &&
platform::is_cpu_place(dst_place)) {
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
} else {
PADDLE_THROW("Not Implemented");
}
lods.emplace_back(dst);
}
return lods;
}
void LoDTensor::MergeLoDTensor(
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
PADDLE_ENFORCE(platform::is_cpu_place(place));
PADDLE_ENFORCE(!lod_tensors.empty());
framework::DDim new_dim = lod_tensors[0]->dims();
std::type_index new_type = lod_tensors[0]->type();
for (auto *lod : lod_tensors) {
PADDLE_ENFORCE(new_dim == lod->dims());
PADDLE_ENFORCE(new_type == lod->type());
PADDLE_ENFORCE(platform::is_cpu_place(lod->place()));
}
new_dim[0] *= lod_tensors.size();
Resize(new_dim);
auto *dst_ptr = reinterpret_cast<uint8_t *>(mutable_data(place, new_type));
for (auto *src : lod_tensors) {
auto size = src->numel() * SizeOfType(src->type());
memory::Copy(boost::get<platform::CPUPlace>(place), dst_ptr,
boost::get<platform::CPUPlace>(src->place()),
src->data<void>(), size);
dst_ptr += size;
}
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -58,6 +58,7 @@ using Vector = thrust::host_vector< ...@@ -58,6 +58,7 @@ using Vector = thrust::host_vector<
using LoD = std::vector<Vector<size_t>>; using LoD = std::vector<Vector<size_t>>;
std::ostream& operator<<(std::ostream& os, const LoD& lod); std::ostream& operator<<(std::ostream& os, const LoD& lod);
std::ostream& operator<<(std::ostream& os, const LoDTensor& t);
/* /*
* Slice levels from a LoD. * Slice levels from a LoD.
...@@ -144,6 +145,12 @@ class LoDTensor : public Tensor { ...@@ -144,6 +145,12 @@ class LoDTensor : public Tensor {
*/ */
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end); void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);
std::vector<LoDTensor> SplitLoDTensor(
const std::vector<platform::Place> places) const;
void MergeLoDTensor(const std::vector<const LoDTensor*>& lod_tensors,
platform::Place place);
private: private:
LoD lod_; LoD lod_;
}; };
......
...@@ -233,7 +233,8 @@ static const Tensor* GetTensorFromVar(const Variable* var) { ...@@ -233,7 +233,8 @@ static const Tensor* GetTensorFromVar(const Variable* var) {
} else if (var->IsType<SelectedRows>()) { } else if (var->IsType<SelectedRows>()) {
t = &(var->Get<SelectedRows>().value()); t = &(var->Get<SelectedRows>().value());
} else { } else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
var->Type().name());
} }
return t; return t;
} }
...@@ -245,7 +246,8 @@ static Tensor* GetMutableTensorFromVar(Variable* var) { ...@@ -245,7 +246,8 @@ static Tensor* GetMutableTensorFromVar(Variable* var) {
} else if (var->IsType<SelectedRows>()) { } else if (var->IsType<SelectedRows>()) {
t = var->GetMutable<SelectedRows>()->mutable_value(); t = var->GetMutable<SelectedRows>()->mutable_value();
} else { } else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
var->Type().name());
} }
return t; return t;
} }
...@@ -407,7 +409,8 @@ class RuntimeInferShapeContext : public InferShapeContext { ...@@ -407,7 +409,8 @@ class RuntimeInferShapeContext : public InferShapeContext {
} else if (var->IsType<SelectedRows>()) { } else if (var->IsType<SelectedRows>()) {
return var->Get<SelectedRows>().GetCompleteDims(); return var->Get<SelectedRows>().GetCompleteDims();
} else { } else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
name, var->Type().name());
} }
} }
...@@ -418,7 +421,8 @@ class RuntimeInferShapeContext : public InferShapeContext { ...@@ -418,7 +421,8 @@ class RuntimeInferShapeContext : public InferShapeContext {
} else if (var->IsType<SelectedRows>()) { } else if (var->IsType<SelectedRows>()) {
var->GetMutable<SelectedRows>()->set_height(dim[0]); var->GetMutable<SelectedRows>()->set_height(dim[0]);
} else { } else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
name, var->Type().name());
} }
} }
......
...@@ -55,6 +55,8 @@ class Tensor { ...@@ -55,6 +55,8 @@ class Tensor {
template <typename T> template <typename T>
inline const T* data() const; inline const T* data() const;
inline void switch_place(platform::Place new_place);
/** /**
* @brief Return a pointer to mutable memory block. * @brief Return a pointer to mutable memory block.
* @note If not exist, then allocation. * @note If not exist, then allocation.
...@@ -200,6 +202,15 @@ class Tensor { ...@@ -200,6 +202,15 @@ class Tensor {
size_t offset_; size_t offset_;
}; };
inline void Tensor::switch_place(platform::Place new_place) {
if (holder_->place() == new_place) {
return;
}
// TODO(tonyyang-svail): do memcpy here.
PADDLE_THROW("Not Implemented");
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
......
...@@ -152,6 +152,7 @@ op_library(conv_transpose_op DEPS vol2col) ...@@ -152,6 +152,7 @@ op_library(conv_transpose_op DEPS vol2col)
op_library(gru_op DEPS sequence2batch gru_compute) op_library(gru_op DEPS sequence2batch gru_compute)
op_library(recurrent_op DEPS executor) op_library(recurrent_op DEPS executor)
op_library(cos_sim_op DEPS cos_sim_functor) op_library(cos_sim_op DEPS cos_sim_functor)
op_library(parallel_do_op DEPS executor)
# FIXME(typhoonzero): save/load depends lodtensor serialization functions # FIXME(typhoonzero): save/load depends lodtensor serialization functions
op_library(save_op DEPS lod_tensor) op_library(save_op DEPS lod_tensor)
op_library(load_op DEPS lod_tensor) op_library(load_op DEPS lod_tensor)
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <vector>
#include "paddle/framework/executor.h"
#include "paddle/framework/op_registry.h"
#include "paddle/framework/threadpool.h"
namespace paddle {
namespace operators {
static constexpr char kInputs[] = "inputs";
static constexpr char kParameters[] = "parameters";
static constexpr char kPlaces[] = "places";
static constexpr char kOutputs[] = "outputs";
static constexpr char kParallelScopes[] = "parallel_scopes";
static constexpr char kParallelBlock[] = "sub_block";
// using ParallelScopeVar = std::vector<framework::Scope *>;
using LoDTensor = framework::LoDTensor;
using OperatorBase = framework::OperatorBase;
void SplitTensorAndMoveTensorToScopes(
const framework::Scope &scope,
const std::vector<framework::Scope *> &sub_scopes,
const std::vector<platform::Place> &places,
const std::vector<std::string> &names) {
for (auto &argu : names) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
auto lod_tensors = tensor.SplitLoDTensor(places);
for (auto &lod : lod_tensors) {
VLOG(3) << lod.dims();
}
for (size_t i = 0; i < sub_scopes.size(); ++i) {
*sub_scopes[i]->Var(argu)->GetMutable<LoDTensor>() = lod_tensors[i];
}
}
}
class ParallelDoOp : public framework::OperatorBase {
public:
ParallelDoOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope &scope,
const platform::Place &place) const override {
// get device context from pool
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(place);
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
auto *program = block->Program();
// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
->GetMutable<std::vector<framework::Scope *>>();
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
sub_scopes.push_back(&scope.NewScope());
}
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(kInputs));
std::vector<std::future<void>> workers;
workers.reserve(places.size());
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
// copy parameter
// some version of boost lacks != for boost::variant
if (!(dev_ctx.GetPlace() == place)) {
PADDLE_THROW("Not Implemented");
}
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}));
}
for (auto &worker : workers) {
worker.wait();
}
// merge output
for (auto &o_name : Outputs(kOutputs)) {
std::vector<const framework::LoDTensor *> lod_tensors;
lod_tensors.reserve(sub_scopes.size());
for (auto *sub_scope : sub_scopes) {
lod_tensors.emplace_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>());
}
auto *lod_tensor_to_be_merged =
scope.FindVar(o_name)->GetMutable<LoDTensor>();
lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
}
}
};
class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker {
public:
ParallelDoOpProtoMaker(OpProto *proto, framework::OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput(kInputs, "").AsDuplicable();
AddInput(kParameters, "").AsDuplicable();
AddInput(kPlaces, "");
AddOutput(kOutputs, "").AsDuplicable();
AddOutput(kParallelScopes, "");
AddAttr<framework::BlockDesc *>(kParallelBlock, "");
AddComment(R"DOC(
ParallelDo Operator.
)DOC");
}
};
class ParallelDoGradOp : public OperatorBase {
public:
ParallelDoGradOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope &scope,
const platform::Place &place) const override {
// // get device context from pool
// platform::DeviceContextPool &pool =
// platform::DeviceContextPool::Instance();
// auto &dev_ctx = *pool.Get(place);
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
auto *program = block->Program();
auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
->Get<std::vector<framework::Scope *>>();
// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
// feed output@grad
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(framework::GradVarName(kOutputs)));
for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
VLOG(3) << s;
VLOG(3) << scope.FindVar(s)->Get<LoDTensor>();
for (auto *sub_scope : sub_scopes) {
VLOG(3) << sub_scope->FindVar(s)->Get<LoDTensor>();
}
}
// exe run
std::vector<std::future<void>> workers;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
// execute
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}));
}
for (auto &worker : workers) {
worker.wait();
}
// merge grad
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
VLOG(3) << s;
auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
VLOG(3) << t;
std::string s_buf = s + "@BUF";
auto *t_buf = sub_scopes[0]->Var(s_buf)->GetMutable<LoDTensor>();
for (size_t place_idx = 1; place_idx < places.size(); ++place_idx) {
auto &tt = sub_scopes[place_idx]->FindVar(s)->Get<LoDTensor>();
VLOG(3) << place_idx;
VLOG(3) << tt;
framework::CopyFrom(tt, places[0], t_buf);
auto sum_op = framework::OpRegistry::CreateOp(
"sum", {{"X", {s, s_buf}}}, {{"Out", {s}}},
framework::AttributeMap{});
sum_op->Run(*sub_scopes[0], place);
}
VLOG(3) << t;
framework::CopyFrom(t, place, scope.FindVar(s)->GetMutable<LoDTensor>());
}
}
};
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
public:
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;
protected:
virtual std::unique_ptr<framework::OpDesc> Apply() const {
auto *grad = new framework::OpDesc();
grad->SetType("parallel_do_grad");
for (auto &input_param : this->InputNames()) {
VLOG(3) << input_param;
grad->SetInput(input_param, this->Input(input_param));
grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param, false));
}
for (auto &output_param : this->OutputNames()) {
if (output_param == kParallelScopes) {
grad->SetInput(output_param, this->Output(output_param));
grad->SetInput(framework::GradVarName(output_param),
this->Output(output_param));
} else {
grad->SetInput(output_param, this->Output(output_param));
grad->SetInput(framework::GradVarName(output_param),
this->OutputGrad(output_param));
}
}
grad->SetAttrMap(this->Attrs());
grad->SetBlockAttr(kParallelBlock, *grad_block_[0]);
return std::unique_ptr<framework::OpDesc>(grad);
}
};
class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext *ctx) const override {
std::vector<std::string> input{kParameters, kInputs};
std::vector<std::string> output{kOutputs};
for (auto &s : input) {
PADDLE_ENFORCE(ctx->HasInputs(s));
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)),
"Cannot find the gradient variable %s",
framework::GradVarName(s));
}
for (auto &s : output) {
PADDLE_ENFORCE(ctx->HasInputs(s));
}
for (auto &s : input) {
ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s));
}
if (ctx->HasInputs(kParameters)) {
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
ctx->SetOutputsDim(framework::GradVarName(kParameters),
ctx->GetInputsDim(kParameters));
}
}
};
} // namespace operators
} // namespace paddle
REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp,
paddle::operators::ParallelDoOpProtoMaker,
paddle::operators::ParallelDoGradOpDescMaker);
REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp,
paddle::operators::ParallelDoGradOpShapeInference);
...@@ -205,6 +205,7 @@ def _append_backward_ops_(target, ...@@ -205,6 +205,7 @@ def _append_backward_ops_(target,
# Getting op's corresponding grad_op # Getting op's corresponding grad_op
grad_op_desc, op_grad_to_var = core.get_grad_op_desc( grad_op_desc, op_grad_to_var = core.get_grad_op_desc(
op.desc, no_grad_dict[block.idx], grad_sub_block_list) op.desc, no_grad_dict[block.idx], grad_sub_block_list)
grad_op_descs.extend(grad_op_desc) grad_op_descs.extend(grad_op_desc)
grad_to_var.update(op_grad_to_var) grad_to_var.update(op_grad_to_var)
......
...@@ -448,7 +448,7 @@ class Operator(object): ...@@ -448,7 +448,7 @@ class Operator(object):
no_kernel_op_set = { no_kernel_op_set = {
'feed', 'fetch', 'save', 'load', 'recurrent', 'feed', 'fetch', 'save', 'load', 'recurrent',
'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send',
'recv' 'recv', 'parallel_do'
} }
if type not in no_kernel_op_set: if type not in no_kernel_op_set:
self.desc.infer_var_type(self.block.desc) self.desc.infer_var_type(self.block.desc)
......
...@@ -6,12 +6,13 @@ import contextlib ...@@ -6,12 +6,13 @@ import contextlib
from ..registry import autodoc from ..registry import autodoc
__all__ = [ __all__ = [
'split_lod_tensor', 'merge_lod_tensor', 'BlockGuard', 'StaticRNNGuard', 'split_lod_tensor', 'merge_lod_tensor', 'BlockGuard',
'StaticRNNMemoryLink', 'WhileGuard', 'While', 'lod_rank_table', 'BlockGuardWithCompletion', 'StaticRNNMemoryLink', 'WhileGuard', 'While',
'max_sequence_len', 'topk', 'lod_tensor_to_array', 'array_to_lod_tensor', 'lod_rank_table', 'max_sequence_len', 'topk', 'lod_tensor_to_array',
'increment', 'array_write', 'create_array', 'less_than', 'array_read', 'array_to_lod_tensor', 'increment', 'array_write', 'create_array',
'shrink_memory', 'array_length', 'IfElse', 'DynamicRNN', 'ConditionalBlock', 'less_than', 'array_read', 'shrink_memory', 'array_length', 'IfElse',
'StaticRNN', 'reorder_lod_tensor_by_rank' 'DynamicRNN', 'ConditionalBlock', 'StaticRNN', 'reorder_lod_tensor_by_rank',
'ParallelDo'
] ]
...@@ -132,29 +133,129 @@ class BlockGuard(object): ...@@ -132,29 +133,129 @@ class BlockGuard(object):
return True return True
class StaticRNNGuard(BlockGuard): class ParallelDo(object):
""" """
StaticRNNGuard class. ParallelDo class.
StaticRNNGuard class is used to create a StaticRNN block in a program. ParallelDo class is used to create a ParallelDo.
"""
def __init__(self, places, name=None):
self.helper = LayerHelper("parallel_do", name=name)
self.inputs = []
self.places = places
self.outputs = []
self.status = StaticRNN.BEFORE_RNN_BLOCK
def do(self):
return BlockGuardWithCompletion(self)
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def __call__(self, *args, **kwargs):
if self.status != StaticRNN.AFTER_RNN_BLOCK:
raise ValueError("RNN output can only be retrieved after rnn block")
if len(self.outputs) == 0:
raise ValueError("RNN has no output")
elif len(self.outputs) == 1:
return self.outputs[0]
else:
return self.outputs
def read_input(self, var):
self.inputs.append(var)
return var
def write_output(self, var):
self.outputs.append(var)
def get_parameters(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
local_inputs = set()
for op in current_block.ops:
for oname in op.output_names:
for out_var_name in op.output(oname):
local_inputs.add(out_var_name)
for var in self.inputs:
local_inputs.add(var.name)
params = list()
for op in current_block.ops:
for iname in op.input_names:
for in_var_name in op.input(iname):
if in_var_name not in local_inputs:
params.append(in_var_name)
return [parent_block.var(name) for name in params]
def complete_op(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
step_scope = parent_block.create_var(
type=core.VarDesc.VarType.STEP_SCOPES)
self.outputs = [
parent_block.create_var(
name=o.name,
shape=o.shape,
dtype=o.dtype,
lod_level=o.lod_level,
persistable=o.persistable,
stop_gradient=o.stop_gradient) for o in self.outputs
]
inputs = [parent_block.var(i.name) for i in self.inputs]
outputs = [parent_block.var(o.name) for o in self.outputs]
parent_block.append_op(
type='parallel_do',
inputs={
'inputs': inputs,
'parameters': self.get_parameters(),
'places': self.places
},
outputs={'outputs': outputs,
'parallel_scopes': [step_scope]},
attrs={'sub_block': current_block})
class BlockGuardWithCompletion(BlockGuard):
"""
BlockGuardWithCompletion class.
BlockGuardWithCompletion class is used to create an op with a block in a program.
""" """
def __init__(self, rnn): def __init__(self, rnn):
if not isinstance(rnn, StaticRNN): if not (isinstance(rnn, StaticRNN) or isinstance(rnn, ParallelDo)):
raise TypeError("StaticRNNGuard takes a StaticRNN") raise TypeError(
super(StaticRNNGuard, self).__init__(rnn.helper.main_program) "BlockGuardWithCompletion takes a StaticRNN or ParallelDo")
super(BlockGuardWithCompletion, self).__init__(rnn.helper.main_program)
self.rnn = rnn self.rnn = rnn
def __enter__(self): def __enter__(self):
self.rnn.status = StaticRNN.IN_RNN_BLOCK self.rnn.status = StaticRNN.IN_RNN_BLOCK
return super(StaticRNNGuard, self).__enter__() return super(BlockGuardWithCompletion, self).__enter__()
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None: if exc_type is not None:
return False return False
self.rnn.status = StaticRNN.AFTER_RNN_BLOCK self.rnn.status = StaticRNN.AFTER_RNN_BLOCK
self.rnn.complete_rnn_op() self.rnn.complete_op()
return super(StaticRNNGuard, self).__exit__(exc_type, exc_val, exc_tb) return super(BlockGuardWithCompletion, self).__exit__(exc_type, exc_val,
exc_tb)
class StaticRNNMemoryLink(object): class StaticRNNMemoryLink(object):
...@@ -200,7 +301,7 @@ class StaticRNN(object): ...@@ -200,7 +301,7 @@ class StaticRNN(object):
self.seq_len = None self.seq_len = None
def step(self): def step(self):
return StaticRNNGuard(self) return BlockGuardWithCompletion(self)
def _assert_in_rnn_block_(self, method): def _assert_in_rnn_block_(self, method):
if self.status != StaticRNN.IN_RNN_BLOCK: if self.status != StaticRNN.IN_RNN_BLOCK:
...@@ -316,7 +417,7 @@ class StaticRNN(object): ...@@ -316,7 +417,7 @@ class StaticRNN(object):
else: else:
return self.outputs return self.outputs
def complete_rnn_op(self): def complete_op(self):
main_program = self.helper.main_program main_program = self.helper.main_program
rnn_block = main_program.current_block() rnn_block = main_program.current_block()
parent_block = self.parent_block() parent_block = self.parent_block()
......
import unittest
import paddle.v2.fluid.layers as layers
import paddle.v2.fluid as fluid
from paddle.v2.fluid.framework import Program
from paddle.v2.fluid.executor import Executor
from paddle.v2.fluid.backward import append_backward
import numpy as np
import paddle.v2.fluid.core as core
class ParallelOpTest(unittest.TestCase):
def setUp(self):
x = layers.data(
shape=[-1, 30, 40],
dtype='float32',
name='x',
append_batch_size=False,
stop_gradient=False)
places = fluid.default_main_program().global_block().create_var()
pd = layers.ParallelDo(places=places)
with pd.do():
data = pd.read_input(x)
hidden = layers.fc(input=data, size=7)
pd.write_output(hidden)
data = pd()
loss = layers.mean(x=data)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(loss)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
exe.run(fluid.default_main_program(),
feed={
x.name: np.random.uniform(0.1, 0.6,
(20, 30, 40)).astype("float32")
})
def test_forward(self):
pass
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册