/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include <vector> #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/detail/safe_ref.h" namespace paddle { namespace operators { static constexpr char kInputs[] = "inputs"; static constexpr char kParameters[] = "parameters"; static constexpr char kPlaces[] = "places"; static constexpr char kOutputs[] = "outputs"; static constexpr char kParallelScopes[] = "parallel_scopes"; static constexpr char kParallelBlock[] = "sub_block"; static constexpr char kUseNCCL[] = "use_nccl"; using LoDTensor = framework::LoDTensor; using SelectedRows = framework::SelectedRows; static void SplitTensorAndMoveTensorToScopes( const framework::Scope &scope, std::vector<framework::Scope *> *sub_scopes, const std::vector<platform::Place> &places, const std::vector<std::string> &names) { size_t num_sub_scopes = 0; for (auto &argu : names) { const auto &tensor = detail::Ref(scope.FindVar(argu), "Cannot find variable %s in the parent scope", argu) .Get<LoDTensor>(); auto lod_tensors = tensor.SplitLoDTensor(places); for (auto &lod : lod_tensors) { VLOG(30) << lod.dims(); } if (num_sub_scopes == 0) { num_sub_scopes = lod_tensors.size(); } else { PADDLE_ENFORCE_EQ(num_sub_scopes, lod_tensors.size()); } PADDLE_ENFORCE_NE(num_sub_scopes, 0); if (sub_scopes->size() == 0) { sub_scopes->reserve(num_sub_scopes); for (size_t i = 0; i < num_sub_scopes; ++i) { sub_scopes->emplace_back(&scope.NewScope()); } } for (size_t i = 0; i < lod_tensors.size(); ++i) { *detail::Ref(sub_scopes->at(i)->Var(argu), "Cannot find variable in the sub-scope", argu) .GetMutable<LoDTensor>() = lod_tensors[i]; } } } inline void CopyOrShare(const framework::Variable &src, const platform::Place &dst_place, framework::Variable *dst) { if (src.IsType<LoDTensor>()) { if (src.Get<LoDTensor>().place() == dst_place) { dst->GetMutable<LoDTensor>()->ShareDataWith(src.Get<LoDTensor>()); dst->GetMutable<LoDTensor>()->set_lod(src.Get<LoDTensor>().lod()); } else { TensorCopy(src.Get<LoDTensor>(), dst_place, dst->GetMutable<LoDTensor>()); } } else if (src.IsType<SelectedRows>()) { auto &src_sr = src.Get<SelectedRows>(); auto *dst_sr = dst->GetMutable<SelectedRows>(); dst_sr->set_height(src_sr.height()); if (src_sr.value().place() == dst_place) { dst_sr->mutable_value()->ShareDataWith(src_sr.value()); dst_sr->set_rows(src_sr.rows()); } else { TensorCopy(src_sr.value(), dst_place, dst_sr->mutable_value()); } } else { PADDLE_THROW("Expect LoDTensor/SelectedRows, get %s", src.Type().name()); } } void WaitOnPlace(const platform::Place place) { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(place); dev_ctx.Wait(); } void WaitOnPlaces(const std::vector<platform::Place> places) { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); for (auto &place : places) { auto &dev_ctx = *pool.Get(place); dev_ctx.Wait(); } } class ParallelDoOp : public framework::OperatorBase { public: ParallelDoOp(const std::string &type, const framework::VariableNameMap &inputs, const framework::VariableNameMap &outputs, const framework::AttributeMap &attrs) : framework::OperatorBase(type, inputs, outputs, attrs) {} private: void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { // get device context from pool platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(place); auto *block = Attr<framework::BlockDesc *>(kParallelBlock); auto *program = block->Program(); auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>(); auto &sub_scopes = *scope.FindVar(Output(kParallelScopes)) ->GetMutable<std::vector<framework::Scope *>>(); // split input SplitTensorAndMoveTensorToScopes(scope, &sub_scopes, places, Inputs(kInputs)); // copy parameter for (auto ¶m : Inputs(kParameters)) { PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(), "Only support parameter type as LoDTensor"); auto &src = scope.FindVar(param)->Get<LoDTensor>(); auto *sub_scope0 = sub_scopes[0]; auto *dst0 = sub_scope0->Var(param)->GetMutable<LoDTensor>(); dst0->ShareDataWith(src); for (size_t i = 1; i < sub_scopes.size(); ++i) { auto &place = places[i]; auto *sub_scope = sub_scopes[i]; auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>(); framework::TensorCopy(src, place, dst); } } WaitOnPlaces(places); std::vector<std::future<void>> workers; workers.reserve(places.size()); for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) { auto &place = places[place_idx]; auto *cur_scope = sub_scopes[place_idx]; workers.emplace_back(framework::Async([program, cur_scope, place, block] { framework::Executor executor(place); executor.Run(*program, cur_scope, block->ID(), false /*create_local_scope*/); })); } for (auto &worker : workers) { worker.wait(); } WaitOnPlaces(places); // merge output for (auto &o_name : Outputs(kOutputs)) { std::vector<const framework::LoDTensor *> lod_tensors; lod_tensors.reserve(sub_scopes.size()); for (auto *sub_scope : sub_scopes) { lod_tensors.emplace_back(&sub_scope->FindVar(o_name)->Get<LoDTensor>()); } auto *lod_tensor_to_be_merged = scope.FindVar(o_name)->GetMutable<LoDTensor>(); lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace()); } WaitOnPlaces(places); } }; class ParallelDoOpProtoMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { AddInput(kInputs, "").AsDuplicable(); AddInput(kParameters, "").AsDuplicable(); AddInput(kPlaces, ""); AddOutput(kOutputs, "").AsDuplicable(); AddOutput(kParallelScopes, ""); AddAttr<framework::BlockDesc *>(kParallelBlock, ""); AddAttr<bool>(kUseNCCL, "true if we use nccl on backward") .SetDefault(false); AddComment(R"DOC( ParallelDo Operator. )DOC"); } }; class ParallelDoGradOp : public framework::OperatorBase { public: ParallelDoGradOp(const std::string &type, const framework::VariableNameMap &inputs, const framework::VariableNameMap &outputs, const framework::AttributeMap &attrs) : framework::OperatorBase(type, inputs, outputs, attrs) {} private: void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { auto *block = Attr<framework::BlockDesc *>(kParallelBlock); auto *program = block->Program(); auto &sub_scopes = scope.FindVar(Input(kParallelScopes)) ->Get<std::vector<framework::Scope *>>(); auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>(); // feed output@grad SplitTensorAndMoveTensorToScopes( scope, const_cast<std::vector<framework::Scope *> *>(&sub_scopes), places, Inputs(framework::GradVarName(kOutputs))); WaitOnPlaces(places); // exe run std::vector<std::future<void>> workers; for (size_t i = 0; i < sub_scopes.size(); ++i) { auto &place = places[i]; auto *cur_scope = sub_scopes[i]; // execute workers.emplace_back(framework::Async([program, cur_scope, place, block] { framework::Executor executor(place); executor.Run(*program, cur_scope, block->ID(), false /*create_local_scope*/); })); } for (auto &worker : workers) { worker.wait(); } WaitOnPlaces(places); // NCCL allreduce op will be added by backward, // so no need to explicitly accumulate grad if (!(Attr<bool>(kUseNCCL))) { AccumulateGrad(scope, place, sub_scopes, places); } else { for (auto &place : places) { PADDLE_ENFORCE(platform::is_gpu_place(place), "NCCL only supports cuda place"); } } for (auto &s : Outputs(framework::GradVarName(kParameters))) { if (s == framework::kEmptyVarName) { continue; } VLOG(30) << "Moving " << s; CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s)); } WaitOnPlaces(places); } void AccumulateGrad(const framework::Scope &scope, const platform::Place &place, const std::vector<framework::Scope *> &sub_scopes, const platform::PlaceList &places) const { for (auto &s : Outputs(framework::GradVarName(kParameters))) { if (s == framework::kEmptyVarName) { continue; } VLOG(30) << "Accumulating " << s; if (s == framework::kEmptyVarName) continue; std::string tmp_name; auto *tmp = sub_scopes[0]->Var(&tmp_name); for (size_t i = 1; i < sub_scopes.size(); ++i) { CopyOrShare(*sub_scopes[i]->FindVar(s), places[0], tmp); WaitOnPlaces(places); auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}}, framework::AttributeMap{{"use_mkldnn", {false}}}); VLOG(100) << sum_op->DebugStringEx(sub_scopes[0]); sum_op->Run(*sub_scopes[0], places[0]); WaitOnPlace(places[0]); } CopyOrShare(*sub_scopes[0]->FindVar(s), place, scope.FindVar(s)); } WaitOnPlaces(places); } }; std::ostream &operator<<(std::ostream &sout, const std::vector<std::string> &strs) { std::copy(strs.begin(), strs.end(), std::ostream_iterator<std::string>(sout, ",")); return sout; } class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker { public: using framework::SingleGradOpDescMaker::SingleGradOpDescMaker; protected: virtual std::unique_ptr<framework::OpDesc> Apply() const { auto *grad = new framework::OpDesc(); grad->SetType("parallel_do_grad"); for (auto &input_param : this->InputNames()) { VLOG(30) << input_param; grad->SetInput(input_param, this->Input(input_param)); if (input_param != kPlaces) { grad->SetOutput(framework::GradVarName(input_param), this->InputGrad(input_param, false)); } } auto *g_block = this->grad_block_[0]; // All variable name that needed by gradient operators std::unordered_set<std::string> all_inputs_in_grad_blocks; for (size_t i = 0; i < g_block->OpSize(); ++i) { auto *op = g_block->Op(i); for (auto &var_name : op->InputArgumentNames()) { all_inputs_in_grad_blocks.insert(var_name); } } for (auto &output_param : this->OutputNames()) { if (output_param == kParallelScopes) { grad->SetInput(output_param, this->Output(output_param)); grad->SetInput(framework::GradVarName(output_param), this->Output(output_param)); } else { grad->SetInput(output_param, this->Output(output_param)); std::vector<std::string> og_names; for (auto &og_name : this->OutputGrad(output_param)) { if (all_inputs_in_grad_blocks.count(og_name) != 0) { // there are some gradient operators who need the OG. So make this // OG as an input of parallel.do og_names.push_back(og_name); } // else, there is no operator who need the OG. Do not use this OG as // an input } grad->SetInput(framework::GradVarName(output_param), og_names); } } grad->SetInput("Communicator", {"nccl_com__do_not_change_"}); grad->SetAttrMap(this->Attrs()); grad->SetBlockAttr(kParallelBlock, grad_block_[0]); return std::unique_ptr<framework::OpDesc>(grad); } }; class ParallelDoGradOpShapeInference : public framework::InferShapeBase { public: void operator()(framework::InferShapeContext *ctx) const override { PADDLE_ENFORCE(ctx->HasInputs(kParameters)); PADDLE_ENFORCE(ctx->HasInputs(kInputs)); PADDLE_ENFORCE(ctx->HasInputs(kOutputs)); ctx->SetOutputsDim(framework::GradVarName(kParameters), ctx->GetInputsDim(kParameters)); auto i_dims = ctx->GetInputsDim(kInputs); auto ig_names = ctx->Outputs(framework::GradVarName(kInputs)); for (size_t i = 0; i < ig_names.size(); ++i) { auto &ig_name = ig_names[i]; if (ig_name == framework::kEmptyVarName) { continue; } ctx->SetDims({ig_name}, {i_dims[i]}); } auto p_dims = ctx->GetInputsDim(kParameters); auto pg_names = ctx->Outputs(framework::GradVarName(kParameters)); for (size_t i = 0; i < pg_names.size(); ++i) { auto &pg_name = pg_names[i]; if (pg_name == framework::kEmptyVarName) { continue; } ctx->SetDims({pg_name}, {p_dims[i]}); } } }; class ParallelDoGradOpVarTypeInference : public framework::VarTypeInference { public: void operator()(const framework::OpDesc &op_desc, framework::BlockDesc *block) const override { framework::BlockDesc *sub_block = boost::get<framework::BlockDesc *>(op_desc.GetAttr(kParallelBlock)); for (auto &out_vars : op_desc.Outputs()) { for (auto &out_var : out_vars.second) { auto &var = block->FindRecursiveOrCreateVar(out_var); auto sub_var = sub_block->FindRecursiveOrCreateVar(out_var); if (sub_var.GetType() != var.GetType()) { var.SetType(sub_var.GetType()); } } } } }; } // namespace operators } // namespace paddle REGISTER_OPERATOR(parallel_do, paddle::operators::ParallelDoOp, paddle::operators::ParallelDoOpProtoMaker, paddle::operators::ParallelDoGradOpDescMaker); REGISTER_OPERATOR(parallel_do_grad, paddle::operators::ParallelDoGradOp, paddle::operators::ParallelDoGradOpShapeInference, paddle::operators::ParallelDoGradOpVarTypeInference);