diff --git a/paddle/framework/lod_tensor.cc b/paddle/framework/lod_tensor.cc index 4198847ad034f0158d5652bdbff76308d7da1116..beb2edee23d5586ef19155ac59a172f23a9794d9 100644 --- a/paddle/framework/lod_tensor.cc +++ b/paddle/framework/lod_tensor.cc @@ -314,6 +314,45 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor) { } } +std::vector LoDTensor::SplitLoDTensor( + const std::vector places) const { + check_memory_size(); + // PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty()) + // , "Disable parallel lod for now"); + PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now"); + PADDLE_ENFORCE(dims()[0] % places.size() == 0, + "Batch size should be divided by places size"); + + std::vector lods; + for (int place_idx = 0; place_idx < places.size(); ++place_idx) { + int begin = place_idx * dims()[0] / places.size(); + int end = (place_idx + 1) * dims()[0] / places.size(); + auto src = Slice(begin, end); + + LoDTensor dst; + dst.Resize(src.dims()); + auto &dst_place = places[place_idx]; + auto dst_ptr = dst.mutable_data(dst_place, src.type()); + + // TODO(tonyyang-svail): + // change the following to framework::CopyFrom + auto src_place = src.place(); + auto src_ptr = src.data(); + auto size = src.numel() * SizeOfType(src.type()); + if (platform::is_cpu_place(src_place) && + platform::is_cpu_place(dst_place)) { + memory::Copy(boost::get(dst_place), dst_ptr, + boost::get(src_place), src_ptr, size); + } else { + PADDLE_THROW("Not Implemented"); + } + + lods.emplace_back(dst); + } + + return lods; +} + void LoDTensor::MergeLoDTensor( const std::vector &lod_tensors, platform::Place place) { PADDLE_ENFORCE(platform::is_cpu_place(place)); diff --git a/paddle/framework/lod_tensor.h b/paddle/framework/lod_tensor.h index 989d8c1c4fe34853e7bcf22a2337d5b6a51f4fda..fae36892f00132c5fb2d9bc017d9b4f24e78b6ea 100644 --- a/paddle/framework/lod_tensor.h +++ b/paddle/framework/lod_tensor.h @@ -144,6 +144,9 @@ class LoDTensor : public Tensor { */ void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end); + std::vector SplitLoDTensor( + const std::vector places) const; + void MergeLoDTensor(const std::vector& lod_tensors, platform::Place place); diff --git a/paddle/operators/elementwise_op.h b/paddle/operators/elementwise_op.h index ea533503e4916cae7e1157ed34da9629dcff3513..b8bbdb1e2a89ca63c73e528bc73b3338e413886d 100644 --- a/paddle/operators/elementwise_op.h +++ b/paddle/operators/elementwise_op.h @@ -34,6 +34,8 @@ class ElementwiseOp : public framework::OperatorWithKernel { auto x_dim = ctx->GetInputDim("X"); auto y_dim = ctx->GetInputDim("Y"); + LOG(INFO) << x_dim; + LOG(INFO) << y_dim; PADDLE_ENFORCE_GE(x_dim.size(), y_dim.size(), "Rank of first input must >= rank of second input."); ctx->SetOutputDim("Out", x_dim); @@ -118,6 +120,9 @@ class ElementwiseOpGrad : public framework::OperatorWithKernel { auto x_dims = ctx->GetInputDim("X"); auto y_dims = ctx->GetInputDim("Y"); auto out_dims = ctx->GetInputDim(framework::GradVarName("Out")); + LOG(INFO) << x_dims; + LOG(INFO) << y_dims; + LOG(INFO) << out_dims; PADDLE_ENFORCE_GE(x_dims.size(), y_dims.size(), "Rank of first input must >= rank of second input."); diff --git a/paddle/operators/parallel_do_op.cc b/paddle/operators/parallel_do_op.cc index c0c1de7369ae3168126185e66329b37eb2848685..b15d171b9b6e6debf53c34050b718bf3e3fa851c 100644 --- a/paddle/operators/parallel_do_op.cc +++ b/paddle/operators/parallel_do_op.cc @@ -13,11 +13,9 @@ limitations under the License. */ #include -#include "chunk_eval_op.h" + #include "paddle/framework/executor.h" #include "paddle/framework/op_registry.h" -#include "paddle/framework/operator.h" -#include "paddle/platform/place.h" namespace paddle { namespace operators { @@ -31,10 +29,31 @@ constexpr char kParallelScopes[] = "parallel_scopes"; constexpr char kParallelBlock[] = "sub_block"; -using ParallelScopeVar = std::vector; +// using ParallelScopeVar = std::vector; +using LoDTensor = framework::LoDTensor; using OperatorBase = framework::OperatorBase; -class ParallelDoOp : public OperatorBase { +void SplitTensorAndMoveTensorToScopes( + const framework::Scope &scope, + const std::vector &sub_scopes, + const std::vector &places, + const std::vector &names) { + for (auto &argu : names) { + auto *var = scope.FindVar(argu); + const auto &tensor = var->Get(); + auto lod_tensors = tensor.SplitLoDTensor(places); + + for (auto &lod : lod_tensors) { + LOG(INFO) << lod.dims(); + } + + for (int i = 0; i < sub_scopes.size(); ++i) { + *sub_scopes[i]->Var(argu)->GetMutable() = lod_tensors[i]; + } + } +} + +class ParallelDoOp : public framework::OperatorBase { public: ParallelDoOp(const std::string &type, const framework::VariableNameMap &inputs, @@ -52,11 +71,18 @@ class ParallelDoOp : public OperatorBase { places.emplace_back(platform::CPUPlace()); places.emplace_back(platform::CPUPlace()); - std::vector sub_scopes; + auto &sub_scopes = *scope.FindVar(Output(kParallelScopes)) + ->GetMutable>(); + // std::vector sub_scopes; for (int place_idx = 0; place_idx < places.size(); ++place_idx) { - VLOG(3) << "Run " << place_idx; - sub_scopes.push_back(&scope.NewScope()); + } + + SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, + Inputs(kInputs)); + + for (int place_idx = 0; place_idx < places.size(); ++place_idx) { + VLOG(3) << "Run " << place_idx; auto &place = places[place_idx]; auto *cur_scope = sub_scopes[place_idx]; @@ -66,26 +92,6 @@ class ParallelDoOp : public OperatorBase { PADDLE_THROW("Not Implemented"); } - // feed input - for (auto &argu : Inputs(kInputs)) { - auto *var = scope.FindVar(argu); - const auto &tensor = var->Get(); - if (!tensor.lod().empty()) { - PADDLE_THROW("Disable parallel lod for now"); - } else { - PADDLE_ENFORCE(tensor.dims()[0] % places.size() == 0, - "Batch size should be divided by places size"); - int begin = place_idx * tensor.dims()[0] / places.size(); - int end = (place_idx + 1) * tensor.dims()[0] / places.size(); - auto feed_tensor = tensor.Slice(begin, end); - feed_tensor.switch_place(place); - - auto *cur_var = cur_scope->Var(argu); - auto *cur_tensor = cur_var->GetMutable(); - *cur_tensor = feed_tensor; - } - } - // execute auto executor = framework::Executor(place); executor.Run(*program, cur_scope, block->ID(), @@ -132,7 +138,49 @@ class ParallelDoGradOp : public OperatorBase { : OperatorBase(type, inputs, outputs, attrs) {} void Run(const framework::Scope &scope, - const platform::DeviceContext &dev_ctx) const override {} + const platform::DeviceContext &dev_ctx) const override { + auto *block = Attr(kParallelBlock); + auto *program = block->Program(); + + auto &sub_scopes = scope.FindVar(Input(kParallelScopes)) + ->Get>(); + + // TODO(tonyyang-svail): get places from input + std::vector places; + places.emplace_back(platform::CPUPlace()); + places.emplace_back(platform::CPUPlace()); + + // feed output@grad + SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, + Inputs(framework::GradVarName(kOutputs))); + + for (auto &s : Inputs(framework::GradVarName(kOutputs))) { + LOG(INFO) << s; + LOG(INFO) << scope.FindVar(s)->Get().dims(); + for (auto *sub_scope : sub_scopes) { + LOG(INFO) << sub_scope->FindVar(s)->Get().dims(); + } + } + // exe run + for (int place_idx = 0; place_idx < places.size(); ++place_idx) { + VLOG(3) << "Run " << place_idx; + + auto &place = places[place_idx]; + auto *cur_scope = sub_scopes[place_idx]; + + // copy parameter + if (dev_ctx.GetPlace() != place) { + PADDLE_THROW("Not Implemented"); + } + + // execute + auto executor = framework::Executor(place); + executor.Run(*program, cur_scope, block->ID(), + false /*create_local_scope*/); + } + + // merge grad + } }; class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {