提交 f899150e 编写于 作者: Y Yang Yang

pass forward runtime

上级 2f56d4b3
......@@ -429,7 +429,8 @@ std::vector<std::unique_ptr<OpDescBind>> MakeBlockBackward(
VLOG(5) << "Making backward " << (*it)->Type() << " op";
std::vector<std::unique_ptr<OpDescBind>> op_grads;
if ((*it)->Type() == "recurrent" || (*it)->Type() == "while") {
if ((*it)->Type() == "recurrent" || (*it)->Type() == "while" ||
(*it)->Type() == "parallel_do") {
int step_block_idx = (*it)->GetBlockAttr("sub_block");
BlockDescBind* backward_block = CreateStepBlock(
program_desc, no_grad_vars, grad_to_var, step_block_idx);
......
......@@ -314,5 +314,30 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor) {
}
}
void LoDTensor::MergeLoDTensor(
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
PADDLE_ENFORCE(platform::is_cpu_place(place));
PADDLE_ENFORCE(!lod_tensors.empty());
framework::DDim new_dim = lod_tensors[0]->dims();
std::type_index new_type = lod_tensors[0]->type();
for (auto *lod : lod_tensors) {
PADDLE_ENFORCE(new_dim == lod->dims());
PADDLE_ENFORCE(new_type == lod->type());
PADDLE_ENFORCE(platform::is_cpu_place(lod->place()));
}
new_dim[0] *= lod_tensors.size();
Resize(new_dim);
auto *dst_ptr = reinterpret_cast<uint8_t *>(mutable_data(place, new_type));
for (auto *src : lod_tensors) {
auto size = src->numel() * SizeOfType(src->type());
memory::Copy(boost::get<platform::CPUPlace>(place), dst_ptr,
boost::get<platform::CPUPlace>(src->place()),
src->data<void>(), size);
dst_ptr += size;
}
}
} // namespace framework
} // namespace paddle
......@@ -144,6 +144,9 @@ class LoDTensor : public Tensor {
*/
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);
void MergeLoDTensor(const std::vector<const LoDTensor*>& lod_tensors,
platform::Place place);
private:
LoD lod_;
};
......
......@@ -179,10 +179,13 @@ static const Tensor* GetTensorFromVar(const Variable* var) {
const Tensor* t = nullptr;
if (var->IsType<LoDTensor>()) {
t = &(var->Get<LoDTensor>());
} else if (var->IsType<Tensor>()) {
t = &(var->Get<Tensor>());
} else if (var->IsType<SelectedRows>()) {
t = &(var->Get<SelectedRows>().value());
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
var->Type().name());
}
return t;
}
......@@ -191,10 +194,13 @@ static Tensor* GetMutableTensorFromVar(Variable* var) {
Tensor* t = nullptr;
if (var->IsType<LoDTensor>()) {
t = var->GetMutable<LoDTensor>();
} else if (var->IsType<Tensor>()) {
t = var->GetMutable<Tensor>();
} else if (var->IsType<SelectedRows>()) {
t = var->GetMutable<SelectedRows>()->mutable_value();
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.",
var->Type().name());
}
return t;
}
......@@ -359,10 +365,13 @@ class RuntimeInferShapeContext : public InferShapeContext {
Variable* var = scope_.FindVar(name);
if (var->IsType<LoDTensor>()) {
return var->Get<LoDTensor>().dims();
} else if (var->IsType<Tensor>()) {
return var->Get<Tensor>().dims();
} else if (var->IsType<SelectedRows>()) {
return var->Get<SelectedRows>().GetCompleteDims();
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
name, var->Type().name());
}
}
......@@ -370,10 +379,13 @@ class RuntimeInferShapeContext : public InferShapeContext {
Variable* var = scope_.FindVar(name);
if (var->IsType<LoDTensor>()) {
var->GetMutable<LoDTensor>()->Resize(dim);
} else if (var->IsType<Tensor>()) {
var->GetMutable<Tensor>()->Resize(dim);
} else if (var->IsType<SelectedRows>()) {
var->GetMutable<SelectedRows>()->set_height(dim[0]);
} else {
PADDLE_THROW("Variable type must be LoDTensor/SelectedRows.");
PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.",
name, var->Type().name());
}
}
......
......@@ -55,6 +55,8 @@ class Tensor {
template <typename T>
inline const T* data() const;
inline void switch_place(platform::Place new_place);
/**
* @brief Return a pointer to mutable memory block.
* @note If not exist, then allocation.
......@@ -183,6 +185,15 @@ class Tensor {
size_t offset_;
};
inline void Tensor::switch_place(platform::Place new_place) {
if (holder_->place() == new_place) {
return;
}
// TODO(tonyyang-svail): do memcpy here.
PADDLE_THROW("Not Implemented");
}
} // namespace framework
} // namespace paddle
......
......@@ -13,9 +13,11 @@
limitations under the License. */
#include <vector>
#include "chunk_eval_op.h"
#include "paddle/framework/executor.h"
#include "paddle/framework/op_registry.h"
#include "paddle/framework/operator.h"
#include "paddle/platform/place.h"
namespace paddle {
namespace operators {
......@@ -28,10 +30,6 @@ constexpr char kOutputs[] = "outputs";
constexpr char kParallelScopes[] = "parallel_scopes";
constexpr char kParallelBlock[] = "sub_block";
// #define GRAD_SUFFIX "@GRAD"
// constexpr char kInputGrads[] = "inputs" GRAD_SUFFIX;
// constexpr char kOutputGrads[] = "outputs" GRAD_SUFFIX;
// constexpr char kParamGrads[] = "parameters" GRAD_SUFFIX;
using ParallelScopeVar = std::vector<framework::Scope *>;
using OperatorBase = framework::OperatorBase;
......@@ -46,21 +44,66 @@ class ParallelDoOp : public OperatorBase {
void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override {
// create scope
// copy parameters
auto *block = Attr<framework::BlockDescBind *>(kParallelBlock);
auto *program = block->Program();
// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
std::vector<framework::Scope *> sub_scopes;
for (int place_idx = 0; place_idx < places.size(); ++place_idx) {
VLOG(3) << "Run " << place_idx;
sub_scopes.push_back(&scope.NewScope());
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
// copy parameter
if (dev_ctx.GetPlace() != place) {
PADDLE_THROW("Not Implemented");
}
};
class ParallelDoGradOp : public OperatorBase {
public:
ParallelDoGradOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
// feed input
for (auto &argu : Inputs(kInputs)) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
if (!tensor.lod().empty()) {
PADDLE_THROW("Disable parallel lod for now");
} else {
PADDLE_ENFORCE(tensor.dims()[0] % places.size() == 0,
"Batch size should be divided by places size");
int begin = place_idx * tensor.dims()[0] / places.size();
int end = (place_idx + 1) * tensor.dims()[0] / places.size();
auto feed_tensor = tensor.Slice(begin, end);
feed_tensor.switch_place(place);
auto *cur_var = cur_scope->Var(argu);
auto *cur_tensor = cur_var->GetMutable<Tensor>();
*cur_tensor = feed_tensor;
}
}
void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override {}
// execute
auto executor = framework::Executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
}
// merge output
for (auto &o_name : Outputs(kOutputs)) {
std::vector<const framework::LoDTensor *> lod_tensors;
for (auto *sub_scope : sub_scopes) {
lod_tensors.push_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>());
}
auto *lod_tensor_to_be_merged =
scope.FindVar(o_name)->GetMutable<LoDTensor>();
lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
}
}
};
class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker {
......@@ -80,16 +123,28 @@ ParallelDo Operator.
}
};
class ParallelDoGradOp : public OperatorBase {
public:
ParallelDoGradOp(const std::string &type,
const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override {}
};
class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
public:
using framework::SingleGradOpDescMaker::SingleGradOpDescMaker;
protected:
virtual std::unique_ptr<framework::OpDescBind> Apply() const {
PADDLE_THROW("Not Implemented");
auto *grad = new framework::OpDescBind();
grad->SetType("recurrent_grad");
grad->SetType("parallel_do_grad");
for (auto &input_param : this->InputNames()) {
LOG(INFO) << input_param;
grad->SetInput(input_param, this->Input(input_param));
grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param));
......@@ -116,26 +171,25 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
class ParallelDoGradOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext *ctx) const override {
PADDLE_THROW("Not Implemented");
// std::vector<std::string> input{kInputs};
// std::vector<std::string> output{kOutputs};
// for (auto &s : input) {
// PADDLE_ENFORCE(ctx->HasInputs(s));
// PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)),
// "Cannot find the gradient variable %s",
// framework::GradVarName(s));
// }
// for (auto &s : output) {
// PADDLE_ENFORCE(ctx->HasInputs(s));
// }
// for (auto &s : input) {
// ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s));
// }
// if (ctx->HasInputs(kParameters)) {
// PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
// ctx->SetOutputsDim(framework::GradVarName(kParameters),
// ctx->GetInputsDim(kParameters));
// }
std::vector<std::string> input{kParameters, kInputs};
std::vector<std::string> output{kOutputs};
for (auto &s : input) {
PADDLE_ENFORCE(ctx->HasInputs(s));
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)),
"Cannot find the gradient variable %s",
framework::GradVarName(s));
}
for (auto &s : output) {
PADDLE_ENFORCE(ctx->HasInputs(s));
}
for (auto &s : input) {
ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s));
}
if (ctx->HasInputs(kParameters)) {
PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters)));
ctx->SetOutputsDim(framework::GradVarName(kParameters),
ctx->GetInputsDim(kParameters));
}
}
};
......
......@@ -140,7 +140,18 @@ class ParallelDo(object):
step_scope = parent_block.create_var(
type=core.VarDesc.VarType.STEP_SCOPES)
self.outputs = [
parent_block.create_var(
name=o.name,
shape=o.shape,
dtype=o.dtype,
lod_level=o.lod_level,
persistable=o.persistable,
stop_gradient=o.stop_gradient) for o in self.outputs
]
inputs = [parent_block.var(i.name) for i in self.inputs]
outputs = [parent_block.var(o.name) for o in self.outputs]
parent_block.append_op(
type='parallel_do',
......@@ -149,7 +160,7 @@ class ParallelDo(object):
'parameters': self.get_parameters(),
'places': self.places
},
outputs={'outputs': self.outputs,
outputs={'outputs': outputs,
'parallel_scopes': [step_scope]},
attrs={'sub_block': current_block})
......
......@@ -12,7 +12,11 @@ import paddle.v2.fluid.core as core
class ParallelOpTest(unittest.TestCase):
def setUp(self):
x = layers.data(
shape=[2, 3, 4], dtype='float32', name='x', append_batch_size=False)
shape=[-1, 3, 4],
dtype='float32',
name='x',
append_batch_size=False,
stop_gradient=False)
places = fluid.default_main_program().global_block().create_var()
pd = layers.ParallelDo(places=places)
......@@ -22,8 +26,16 @@ class ParallelOpTest(unittest.TestCase):
hidden = layers.fc(input=data, size=7)
pd.write_output(hidden)
data = pd()
print data
print fluid.default_main_program()
loss = layers.mean(x=data)
append_backward_ops(loss)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
exe.run(fluid.default_main_program(),
feed={
x.name: np.random.uniform(0.1, 0.6,
(2, 3, 4)).astype("float32")
})
def test_forward(self):
pass
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册