From 4bcc0b64cbb7533cf8f03bc3f1ba39a6254477ee Mon Sep 17 00:00:00 2001 From: "Yang Yang(Tony)" Date: Wed, 10 Jan 2018 15:33:10 +0800 Subject: [PATCH] [WIP] feature/parallel_gpu (#7293) feature/parallel_gpu --- paddle/framework/lod_tensor.cc | 61 +++++++++---------- paddle/framework/tensor_util.h | 36 +++++------ paddle/framework/var_desc.cc | 2 +- paddle/operators/get_places_op.cc | 3 +- paddle/operators/parallel_do_op.cc | 56 +++++++++++------ .../paddle/v2/fluid/tests/test_parallel_op.py | 2 +- 6 files changed, 89 insertions(+), 71 deletions(-) diff --git a/paddle/framework/lod_tensor.cc b/paddle/framework/lod_tensor.cc index 506fde44053..7ae94c64653 100644 --- a/paddle/framework/lod_tensor.cc +++ b/paddle/framework/lod_tensor.cc @@ -44,9 +44,19 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) { } std::ostream &operator<<(std::ostream &os, const LoDTensor &t) { - PADDLE_ENFORCE(platform::is_cpu_place(t.place())); PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code()); + if (!platform::is_cpu_place(t.place())) { + LoDTensor tt; + framework::Copy(t, platform::CPUPlace(), &tt); + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &dev_ctx = *pool.Get(t.place()); + dev_ctx.Wait(); + + os << tt; + return os; + } + os << "dim: " << t.dims() << "\n"; os << "lod: " << t.lod() << "\n"; @@ -211,38 +221,23 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, DeserializeFromStream(is, static_cast(tensor), dev_ctx); } +// TODO(tonyyang-svail): make this function support LoD std::vector LoDTensor::SplitLoDTensor( const std::vector places) const { check_memory_size(); - // PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty()) - // , "Disable parallel lod for now"); PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now"); PADDLE_ENFORCE(dims()[0] % places.size() == 0, "Batch size should be divided by places size"); std::vector lods; for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) { - size_t begin = place_idx * dims()[0] / places.size(); - size_t end = (place_idx + 1) * dims()[0] / places.size(); - auto src = Slice(static_cast(begin), static_cast(end)); + int begin = place_idx * dims()[0] / places.size(); + int end = (place_idx + 1) * dims()[0] / places.size(); - LoDTensor dst; - dst.Resize(src.dims()); + auto src = Slice(begin, end); auto &dst_place = places[place_idx]; - auto dst_ptr = dst.mutable_data(dst_place, src.type()); - - // TODO(tonyyang-svail): - // change the following to framework::Copy - auto src_place = src.place(); - auto src_ptr = src.data(); - auto size = src.numel() * SizeOfType(src.type()); - if (platform::is_cpu_place(src_place) && - platform::is_cpu_place(dst_place)) { - memory::Copy(boost::get(dst_place), dst_ptr, - boost::get(src_place), src_ptr, size); - } else { - PADDLE_THROW("Not Implemented"); - } + LoDTensor dst; + framework::Copy(src, dst_place, &dst); lods.emplace_back(dst); } @@ -250,28 +245,30 @@ std::vector LoDTensor::SplitLoDTensor( return lods; } +// TODO(tonyyang-svail): make this function support LoD void LoDTensor::MergeLoDTensor( - const std::vector &lod_tensors, platform::Place place) { - PADDLE_ENFORCE(platform::is_cpu_place(place)); + const std::vector &lod_tensors, + platform::Place dst_place) { PADDLE_ENFORCE(!lod_tensors.empty()); - framework::DDim new_dim = lod_tensors[0]->dims(); std::type_index new_type = lod_tensors[0]->type(); + auto new_layout = lod_tensors[0]->layout(); for (auto *lod : lod_tensors) { PADDLE_ENFORCE(new_dim == lod->dims()); PADDLE_ENFORCE(new_type == lod->type()); - PADDLE_ENFORCE(platform::is_cpu_place(lod->place())); + PADDLE_ENFORCE(new_layout == lod->layout()); } new_dim[0] *= lod_tensors.size(); Resize(new_dim); + set_layout(new_layout); - auto *dst_ptr = reinterpret_cast(mutable_data(place, new_type)); + mutable_data(dst_place, new_type); + int begin = 0; for (auto *src : lod_tensors) { - auto size = src->numel() * SizeOfType(src->type()); - memory::Copy(boost::get(place), dst_ptr, - boost::get(src->place()), - src->data(), size); - dst_ptr += size; + int end = begin + src->dims()[0]; + auto dst = Slice(begin, end); + framework::Copy(*src, dst_place, &dst); + begin = end; } } diff --git a/paddle/framework/tensor_util.h b/paddle/framework/tensor_util.h index 7c56ccf17f9..f541d2ba693 100644 --- a/paddle/framework/tensor_util.h +++ b/paddle/framework/tensor_util.h @@ -31,9 +31,10 @@ namespace framework { * * @note Copy supports CPU <-> GPU, GPU <-> GPU. */ - inline void Copy(const Tensor& src, const platform::Place& dst_place, const platform::DeviceContext& ctx, Tensor* dst) { + VLOG(3) << "Copy " << src.dims() << " from " << src.place() << " to " + << dst_place; src.check_memory_size(); dst->Resize(src.dims()); @@ -88,26 +89,25 @@ inline void Copy(const Tensor& src, const platform::Place& dst_place, } /** - * @brief Copy supports CPU <-> CPU + * @brief Wrapper on + * Copy(const Tensor& src, const platform::Place& dst_place, + * const platform::DeviceContext& ctx, Tensor* dst); + * + * @param[in] src The external tensor. + * @param[in] dst_place The dst place. + * + * @note Copy supports CPU <-> GPU, GPU <-> GPU. */ inline void Copy(const Tensor& src, const platform::Place& dst_place, Tensor* dst) { - src.check_memory_size(); - dst->Resize(src.dims()); - dst->set_layout(src.layout()); - - auto src_place = src.place(); - auto src_ptr = src.data(); - - auto dst_ptr = dst->mutable_data(dst_place, src.type()); - - auto size = src.numel() * SizeOfType(src.type()); - - PADDLE_ENFORCE(platform::is_cpu_place(src_place) && - platform::is_cpu_place(dst_place)); - - memory::Copy(boost::get(dst_place), dst_ptr, - boost::get(src_place), src_ptr, size); + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + const platform::DeviceContext* dev_ctx; + if (platform::is_gpu_place(src.place())) { + dev_ctx = pool.Get(src.place()); + } else { + dev_ctx = pool.Get(dst_place); + } + Copy(src, dst_place, *dev_ctx, dst); } /** diff --git a/paddle/framework/var_desc.cc b/paddle/framework/var_desc.cc index aeab18d7214..62ab6593ef2 100644 --- a/paddle/framework/var_desc.cc +++ b/paddle/framework/var_desc.cc @@ -74,7 +74,7 @@ const proto::TensorDesc &VarDesc::tensor_desc() const { case proto::VarDesc::LOD_TENSOR_ARRAY: return desc_.tensor_array().tensor(); default: - PADDLE_THROW("The type of var '", this->Name(), "' is unsupported."); + PADDLE_THROW("The type of var %s is unsupported.", this->Name()); } } diff --git a/paddle/operators/get_places_op.cc b/paddle/operators/get_places_op.cc index 291bbbcb3a7..2c714ac43d4 100644 --- a/paddle/operators/get_places_op.cc +++ b/paddle/operators/get_places_op.cc @@ -111,4 +111,5 @@ class GetPlacesInferShape : public framework::InferShapeBase { namespace ops = paddle::operators; REGISTER_OPERATOR(get_places, ops::GetPlacesOp, ops::GetPlacesOpProtoMaker, - ops::GetPlacesInferVarType, ops::GetPlacesInferShape); + ops::GetPlacesInferVarType, ops::GetPlacesInferShape, + paddle::framework::EmptyGradOpMaker); diff --git a/paddle/operators/parallel_do_op.cc b/paddle/operators/parallel_do_op.cc index a6bc70f4c89..e1bec0421e7 100644 --- a/paddle/operators/parallel_do_op.cc +++ b/paddle/operators/parallel_do_op.cc @@ -39,6 +39,7 @@ void SplitTensorAndMoveTensorToScopes( const std::vector &sub_scopes, const std::vector &places, const std::vector &names) { + PADDLE_ENFORCE_EQ(sub_scopes.size(), places.size()); for (auto &argu : names) { auto *var = scope.FindVar(argu); const auto &tensor = var->Get(); @@ -54,6 +55,15 @@ void SplitTensorAndMoveTensorToScopes( } } +void WaitOnPlaces(const std::vector places) { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + + for (auto &place : places) { + auto &dev_ctx = *pool.Get(place); + dev_ctx.Wait(); + } +} + class ParallelDoOp : public framework::OperatorBase { public: ParallelDoOp(const std::string &type, @@ -71,10 +81,7 @@ class ParallelDoOp : public framework::OperatorBase { auto *block = Attr(kParallelBlock); auto *program = block->Program(); - // TODO(tonyyang-svail): get places from input - std::vector places; - places.emplace_back(platform::CPUPlace()); - places.emplace_back(platform::CPUPlace()); + auto &places = scope.FindVar(Input(kPlaces))->Get(); auto &sub_scopes = *scope.FindVar(Output(kParallelScopes)) ->GetMutable>(); @@ -82,8 +89,22 @@ class ParallelDoOp : public framework::OperatorBase { sub_scopes.push_back(&scope.NewScope()); } + // split input SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, Inputs(kInputs)); + // copy parameter + for (auto ¶m : Inputs(kParameters)) { + PADDLE_ENFORCE(scope.FindVar(param)->IsType(), + "Only support parameter type as LoDTensor"); + auto &src = scope.FindVar(param)->Get(); + for (size_t i = 0; i < places.size(); ++i) { + auto &place = places[i]; + auto *sub_scope = sub_scopes[i]; + auto *dst = sub_scope->Var(param)->GetMutable(); + framework::Copy(src, place, dst); + } + } + WaitOnPlaces(places); std::vector> workers; workers.reserve(places.size()); @@ -93,12 +114,6 @@ class ParallelDoOp : public framework::OperatorBase { auto &place = places[place_idx]; auto *cur_scope = sub_scopes[place_idx]; - // copy parameter - // some version of boost lacks != for boost::variant - if (!(dev_ctx.GetPlace() == place)) { - PADDLE_THROW("Not Implemented"); - } - workers.emplace_back(framework::Async([program, cur_scope, place, block] { framework::Executor executor(place); executor.Run(*program, cur_scope, block->ID(), @@ -108,6 +123,7 @@ class ParallelDoOp : public framework::OperatorBase { for (auto &worker : workers) { worker.wait(); } + WaitOnPlaces(places); // merge output for (auto &o_name : Outputs(kOutputs)) { @@ -121,6 +137,7 @@ class ParallelDoOp : public framework::OperatorBase { scope.FindVar(o_name)->GetMutable(); lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace()); } + WaitOnPlaces(places); } }; @@ -161,15 +178,14 @@ class ParallelDoGradOp : public OperatorBase { auto &sub_scopes = scope.FindVar(Input(kParallelScopes)) ->Get>(); - // TODO(tonyyang-svail): get places from input - std::vector places; - places.emplace_back(platform::CPUPlace()); - places.emplace_back(platform::CPUPlace()); + auto &places = scope.FindVar(Input(kPlaces))->Get(); // feed output@grad SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places, Inputs(framework::GradVarName(kOutputs))); + WaitOnPlaces(places); + // for debugging for (auto &s : Inputs(framework::GradVarName(kOutputs))) { VLOG(3) << s; VLOG(3) << scope.FindVar(s)->Get(); @@ -196,10 +212,11 @@ class ParallelDoGradOp : public OperatorBase { for (auto &worker : workers) { worker.wait(); } + WaitOnPlaces(places); // merge grad for (auto &s : Outputs(framework::GradVarName(kParameters))) { - VLOG(3) << s; + VLOG(3) << "merge grad " << s; auto &t = sub_scopes[0]->FindVar(s)->Get(); VLOG(3) << t; @@ -216,7 +233,8 @@ class ParallelDoGradOp : public OperatorBase { auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {s, s_buf}}}, {{"Out", {s}}}, framework::AttributeMap{}); - sum_op->Run(*sub_scopes[0], place); + sum_op->Run(*sub_scopes[0], places[0]); + WaitOnPlaces(places); } VLOG(3) << t; @@ -236,8 +254,10 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { for (auto &input_param : this->InputNames()) { VLOG(3) << input_param; grad->SetInput(input_param, this->Input(input_param)); - grad->SetOutput(framework::GradVarName(input_param), - this->InputGrad(input_param, false)); + if (input_param != kPlaces) { + grad->SetOutput(framework::GradVarName(input_param), + this->InputGrad(input_param, false)); + } } for (auto &output_param : this->OutputNames()) { diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 2788f4e519b..59ed041e7fa 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -18,7 +18,7 @@ class ParallelOpTest(unittest.TestCase): append_batch_size=False, stop_gradient=False) - places = fluid.default_main_program().global_block().create_var() + places = layers.get_places(device_count=4) pd = layers.ParallelDo(places=places) with pd.do(): -- GitLab