diff --git a/paddle/framework/backward.cc b/paddle/framework/backward.cc index faf6e60cbd1bcda9864c12696b336998ea7606b7..358ddae083d5a88c86fd97108b07e61b12e9d8ae 100644 --- a/paddle/framework/backward.cc +++ b/paddle/framework/backward.cc @@ -429,7 +429,8 @@ std::vector> MakeBlockBackward( VLOG(5) << "Making backward " << (*it)->Type() << " op"; std::vector> op_grads; - if ((*it)->Type() == "recurrent" || (*it)->Type() == "while") { + if ((*it)->Type() == "recurrent" || (*it)->Type() == "while" || + (*it)->Type() == "parallel_do") { int step_block_idx = (*it)->GetBlockAttr("sub_block"); BlockDescBind* backward_block = CreateStepBlock( program_desc, no_grad_vars, grad_to_var, step_block_idx); diff --git a/paddle/framework/lod_tensor.cc b/paddle/framework/lod_tensor.cc index fdf6de4babff3bb3c253aaf516636882237e6faf..4198847ad034f0158d5652bdbff76308d7da1116 100644 --- a/paddle/framework/lod_tensor.cc +++ b/paddle/framework/lod_tensor.cc @@ -314,5 +314,30 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor) { } } +void LoDTensor::MergeLoDTensor( + const std::vector &lod_tensors, platform::Place place) { + PADDLE_ENFORCE(platform::is_cpu_place(place)); + PADDLE_ENFORCE(!lod_tensors.empty()); + + framework::DDim new_dim = lod_tensors[0]->dims(); + std::type_index new_type = lod_tensors[0]->type(); + for (auto *lod : lod_tensors) { + PADDLE_ENFORCE(new_dim == lod->dims()); + PADDLE_ENFORCE(new_type == lod->type()); + PADDLE_ENFORCE(platform::is_cpu_place(lod->place())); + } + new_dim[0] *= lod_tensors.size(); + Resize(new_dim); + + auto *dst_ptr = reinterpret_cast(mutable_data(place, new_type)); + for (auto *src : lod_tensors) { + auto size = src->numel() * SizeOfType(src->type()); + memory::Copy(boost::get(place), dst_ptr, + boost::get(src->place()), + src->data(), size); + dst_ptr += size; + } +} + } // namespace framework } // namespace paddle diff --git a/paddle/framework/lod_tensor.h b/paddle/framework/lod_tensor.h index 9411c96aea4c10ebf921cc3e3b442769c8acbefa..989d8c1c4fe34853e7bcf22a2337d5b6a51f4fda 100644 --- a/paddle/framework/lod_tensor.h +++ b/paddle/framework/lod_tensor.h @@ -144,6 +144,9 @@ class LoDTensor : public Tensor { */ void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end); + void MergeLoDTensor(const std::vector& lod_tensors, + platform::Place place); + private: LoD lod_; }; diff --git a/paddle/framework/operator.cc b/paddle/framework/operator.cc index e83d7547831744333d6a9c36e842d840a2a0dc03..26bc646753c0005caa51708cd8a4c5b3a4fefe66 100644 --- a/paddle/framework/operator.cc +++ b/paddle/framework/operator.cc @@ -179,10 +179,13 @@ static const Tensor* GetTensorFromVar(const Variable* var) { const Tensor* t = nullptr; if (var->IsType()) { t = &(var->Get()); + } else if (var->IsType()) { + t = &(var->Get()); } else if (var->IsType()) { t = &(var->Get().value()); } else { - PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); + PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.", + var->Type().name()); } return t; } @@ -191,10 +194,13 @@ static Tensor* GetMutableTensorFromVar(Variable* var) { Tensor* t = nullptr; if (var->IsType()) { t = var->GetMutable(); + } else if (var->IsType()) { + t = var->GetMutable(); } else if (var->IsType()) { t = var->GetMutable()->mutable_value(); } else { - PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); + PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.", + var->Type().name()); } return t; } @@ -359,10 +365,13 @@ class RuntimeInferShapeContext : public InferShapeContext { Variable* var = scope_.FindVar(name); if (var->IsType()) { return var->Get().dims(); + } else if (var->IsType()) { + return var->Get().dims(); } else if (var->IsType()) { return var->Get().GetCompleteDims(); } else { - PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); + PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.", + name, var->Type().name()); } } @@ -370,10 +379,13 @@ class RuntimeInferShapeContext : public InferShapeContext { Variable* var = scope_.FindVar(name); if (var->IsType()) { var->GetMutable()->Resize(dim); + } else if (var->IsType()) { + var->GetMutable()->Resize(dim); } else if (var->IsType()) { var->GetMutable()->set_height(dim[0]); } else { - PADDLE_THROW("Variable type must be LoDTensor/SelectedRows."); + PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.", + name, var->Type().name()); } } diff --git a/paddle/framework/tensor.h b/paddle/framework/tensor.h index 6a0c5133c9a6bb326ca51755242e75b6eb9e5474..837f63c706944d1013515f2d8a114bfdbcb44137 100644 --- a/paddle/framework/tensor.h +++ b/paddle/framework/tensor.h @@ -55,6 +55,8 @@ class Tensor { template inline const T* data() const; + inline void switch_place(platform::Place new_place); + /** * @brief Return a pointer to mutable memory block. * @note If not exist, then allocation. @@ -183,6 +185,15 @@ class Tensor { size_t offset_; }; +inline void Tensor::switch_place(platform::Place new_place) { + if (holder_->place() == new_place) { + return; + } + + // TODO(tonyyang-svail): do memcpy here. + PADDLE_THROW("Not Implemented"); +} + } // namespace framework } // namespace paddle diff --git a/paddle/operators/parallel_do_op.cc b/paddle/operators/parallel_do_op.cc index bde59c7e7ad8c6ae6d72e2373944d650fc31a764..c0c1de7369ae3168126185e66329b37eb2848685 100644 --- a/paddle/operators/parallel_do_op.cc +++ b/paddle/operators/parallel_do_op.cc @@ -13,9 +13,11 @@ limitations under the License. */ #include +#include "chunk_eval_op.h" #include "paddle/framework/executor.h" #include "paddle/framework/op_registry.h" #include "paddle/framework/operator.h" +#include "paddle/platform/place.h" namespace paddle { namespace operators { @@ -28,10 +30,6 @@ constexpr char kOutputs[] = "outputs"; constexpr char kParallelScopes[] = "parallel_scopes"; constexpr char kParallelBlock[] = "sub_block"; -// #define GRAD_SUFFIX "@GRAD" -// constexpr char kInputGrads[] = "inputs" GRAD_SUFFIX; -// constexpr char kOutputGrads[] = "outputs" GRAD_SUFFIX; -// constexpr char kParamGrads[] = "parameters" GRAD_SUFFIX; using ParallelScopeVar = std::vector; using OperatorBase = framework::OperatorBase; @@ -46,21 +44,66 @@ class ParallelDoOp : public OperatorBase { void Run(const framework::Scope &scope, const platform::DeviceContext &dev_ctx) const override { - // create scope - // copy parameters - } -}; + auto *block = Attr(kParallelBlock); + auto *program = block->Program(); -class ParallelDoGradOp : public OperatorBase { - public: - ParallelDoGradOp(const std::string &type, - const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs) - : OperatorBase(type, inputs, outputs, attrs) {} + // TODO(tonyyang-svail): get places from input + std::vector places; + places.emplace_back(platform::CPUPlace()); + places.emplace_back(platform::CPUPlace()); - void Run(const framework::Scope &scope, - const platform::DeviceContext &dev_ctx) const override {} + std::vector sub_scopes; + for (int place_idx = 0; place_idx < places.size(); ++place_idx) { + VLOG(3) << "Run " << place_idx; + + sub_scopes.push_back(&scope.NewScope()); + + auto &place = places[place_idx]; + auto *cur_scope = sub_scopes[place_idx]; + + // copy parameter + if (dev_ctx.GetPlace() != place) { + PADDLE_THROW("Not Implemented"); + } + + // feed input + for (auto &argu : Inputs(kInputs)) { + auto *var = scope.FindVar(argu); + const auto &tensor = var->Get(); + if (!tensor.lod().empty()) { + PADDLE_THROW("Disable parallel lod for now"); + } else { + PADDLE_ENFORCE(tensor.dims()[0] % places.size() == 0, + "Batch size should be divided by places size"); + int begin = place_idx * tensor.dims()[0] / places.size(); + int end = (place_idx + 1) * tensor.dims()[0] / places.size(); + auto feed_tensor = tensor.Slice(begin, end); + feed_tensor.switch_place(place); + + auto *cur_var = cur_scope->Var(argu); + auto *cur_tensor = cur_var->GetMutable(); + *cur_tensor = feed_tensor; + } + } + + // execute + auto executor = framework::Executor(place); + executor.Run(*program, cur_scope, block->ID(), + false /*create_local_scope*/); + } + + // merge output + for (auto &o_name : Outputs(kOutputs)) { + std::vector lod_tensors; + for (auto *sub_scope : sub_scopes) { + lod_tensors.push_back(&sub_scope->FindVar(o_name)->Get()); + } + + auto *lod_tensor_to_be_merged = + scope.FindVar(o_name)->GetMutable(); + lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace()); + } + } }; class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker { @@ -80,16 +123,28 @@ ParallelDo Operator. } }; +class ParallelDoGradOp : public OperatorBase { + public: + ParallelDoGradOp(const std::string &type, + const framework::VariableNameMap &inputs, + const framework::VariableNameMap &outputs, + const framework::AttributeMap &attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void Run(const framework::Scope &scope, + const platform::DeviceContext &dev_ctx) const override {} +}; + class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { public: using framework::SingleGradOpDescMaker::SingleGradOpDescMaker; protected: virtual std::unique_ptr Apply() const { - PADDLE_THROW("Not Implemented"); auto *grad = new framework::OpDescBind(); - grad->SetType("recurrent_grad"); + grad->SetType("parallel_do_grad"); for (auto &input_param : this->InputNames()) { + LOG(INFO) << input_param; grad->SetInput(input_param, this->Input(input_param)); grad->SetOutput(framework::GradVarName(input_param), this->InputGrad(input_param)); @@ -116,26 +171,25 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { class ParallelDoGradOpShapeInference : public framework::InferShapeBase { public: void operator()(framework::InferShapeContext *ctx) const override { - PADDLE_THROW("Not Implemented"); - // std::vector input{kInputs}; - // std::vector output{kOutputs}; - // for (auto &s : input) { - // PADDLE_ENFORCE(ctx->HasInputs(s)); - // PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)), - // "Cannot find the gradient variable %s", - // framework::GradVarName(s)); - // } - // for (auto &s : output) { - // PADDLE_ENFORCE(ctx->HasInputs(s)); - // } - // for (auto &s : input) { - // ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s)); - // } - // if (ctx->HasInputs(kParameters)) { - // PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters))); - // ctx->SetOutputsDim(framework::GradVarName(kParameters), - // ctx->GetInputsDim(kParameters)); - // } + std::vector input{kParameters, kInputs}; + std::vector output{kOutputs}; + for (auto &s : input) { + PADDLE_ENFORCE(ctx->HasInputs(s)); + PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(s)), + "Cannot find the gradient variable %s", + framework::GradVarName(s)); + } + for (auto &s : output) { + PADDLE_ENFORCE(ctx->HasInputs(s)); + } + for (auto &s : input) { + ctx->SetOutputsDim(framework::GradVarName(s), ctx->GetInputsDim(s)); + } + if (ctx->HasInputs(kParameters)) { + PADDLE_ENFORCE(ctx->HasOutputs(framework::GradVarName(kParameters))); + ctx->SetOutputsDim(framework::GradVarName(kParameters), + ctx->GetInputsDim(kParameters)); + } } }; diff --git a/python/paddle/v2/fluid/layers/control_flow.py b/python/paddle/v2/fluid/layers/control_flow.py index 09ab9726d1d37628308c6c4568a8188aae06d545..aafecdafa242e20ed0c3fa037ad75a49316a1a9f 100644 --- a/python/paddle/v2/fluid/layers/control_flow.py +++ b/python/paddle/v2/fluid/layers/control_flow.py @@ -140,7 +140,18 @@ class ParallelDo(object): step_scope = parent_block.create_var( type=core.VarDesc.VarType.STEP_SCOPES) + self.outputs = [ + parent_block.create_var( + name=o.name, + shape=o.shape, + dtype=o.dtype, + lod_level=o.lod_level, + persistable=o.persistable, + stop_gradient=o.stop_gradient) for o in self.outputs + ] + inputs = [parent_block.var(i.name) for i in self.inputs] + outputs = [parent_block.var(o.name) for o in self.outputs] parent_block.append_op( type='parallel_do', @@ -149,7 +160,7 @@ class ParallelDo(object): 'parameters': self.get_parameters(), 'places': self.places }, - outputs={'outputs': self.outputs, + outputs={'outputs': outputs, 'parallel_scopes': [step_scope]}, attrs={'sub_block': current_block}) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 1e643032849d4b8041ba4e1f115b01ab4f5d9152..61126cc9da193131e651d0d75b3b11dcff8478c8 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -12,7 +12,11 @@ import paddle.v2.fluid.core as core class ParallelOpTest(unittest.TestCase): def setUp(self): x = layers.data( - shape=[2, 3, 4], dtype='float32', name='x', append_batch_size=False) + shape=[-1, 3, 4], + dtype='float32', + name='x', + append_batch_size=False, + stop_gradient=False) places = fluid.default_main_program().global_block().create_var() pd = layers.ParallelDo(places=places) @@ -22,8 +26,16 @@ class ParallelOpTest(unittest.TestCase): hidden = layers.fc(input=data, size=7) pd.write_output(hidden) data = pd() - print data - print fluid.default_main_program() + loss = layers.mean(x=data) + append_backward_ops(loss) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + exe.run(fluid.default_main_program(), + feed={ + x.name: np.random.uniform(0.1, 0.6, + (2, 3, 4)).astype("float32") + }) def test_forward(self): pass