diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 897e41f79f4e3bb9cecbe7b42fc6c4fd3401d839..6f990e28666829dd2f2fe6f07362188a77ae6468 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -2,8 +2,6 @@ cc_library(var_handle SRCS var_handle.cc DEPS place) cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context lod_tensor) cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) -nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory - dynload_cuda) cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry) cc_library(send_op_handle SRCS send_op_handle.cc DEPS framework_proto scope place operator op_registry) @@ -11,20 +9,29 @@ cc_library(ssa_graph SRCS ssa_graph.cc DEPS var_handle op_handle_base) cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS ssa_graph) if(WITH_GPU) + nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory + dynload_cuda) set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle) + nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base scope ddim dynload_cuda) else() set(multi_devices_graph_builder_deps) + cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base scope ddim) endif() cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle - scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps}) + scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps}) + cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto) cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope simple_threadpool device_context) -cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory) -cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory) +cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows) + +cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base variable_visitor scope ddim memory) +cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope variable_visitor ddim memory) cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory device_context broadcast_op_handle) cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory device_context gather_op_handle) +cc_test(reduce_op_handle_test SRCS reduce_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory + device_context reduce_op_handle ) diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index 7d29012380e1b1710704d71a28d21dcc3097eb51..0bc3ee78d67e8548f093ff7086cf06a1ffb1c58b 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -13,95 +13,72 @@ // limitations under the License. #include "paddle/fluid/framework/details/broadcast_op_handle.h" +#include "paddle/fluid/framework/details/container_cast.h" +#include "paddle/fluid/framework/details/variable_visitor.h" namespace paddle { namespace framework { namespace details { - -Tensor *GetTensorFromVar(Variable *in_var) { - if (in_var->IsType()) { - return in_var->GetMutable(); - } else if (in_var->IsType()) { - return in_var->GetMutable()->mutable_value(); - } else { - PADDLE_THROW("Var should be LoDTensor or SelectedRows"); - } - return nullptr; -} - BroadcastOpHandle::BroadcastOpHandle(const std::vector &local_scopes, const std::vector &places) : local_scopes_(local_scopes), places_(places) {} void BroadcastOpHandle::RunImpl() { - // the input may have dummy var. - std::vector in_var_handle; - for (auto *in : inputs_) { - auto *out_handle = dynamic_cast(in); - if (out_handle) { - in_var_handle.push_back(out_handle); - } - } - PADDLE_ENFORCE_EQ(in_var_handle.size(), 1, - "The number of input should be one."); + // the input and output may have dummy var. + VarHandle *in_var_handle; - // the output may have dummy var. - std::vector out_var_handles; - for (auto *out : outputs_) { - auto *out_handle = dynamic_cast(out); - if (out_handle) { - out_var_handles.push_back(out_handle); - } + { + auto in_var_handles = DynamicCast(inputs_); + PADDLE_ENFORCE_EQ(in_var_handles.size(), 1, + "The number of input should be one."); + in_var_handle = in_var_handles[0]; } + auto out_var_handles = DynamicCast(outputs_); + PADDLE_ENFORCE_EQ( out_var_handles.size(), places_.size(), "The number of output should equal to the number of places."); - // Wait input done, this Wait is asynchronous operation - auto &in_place = in_var_handle[0]->place_; - if (in_var_handle[0]->generated_op_) { - for (auto *out : out_var_handles) { - auto &out_p = out->place_; - in_var_handle[0]->generated_op_->Wait(dev_ctxes_[out_p]); - } - } + // Wait input done, this Wait is asynchronous operation platform::Place + // &in_place; + WaitInputVarGenerated(*in_var_handle); - // - auto in_scope_idx = in_var_handle[0]->scope_idx_; - auto in_var = - local_scopes_.at(in_scope_idx)->FindVar(in_var_handle[0]->name_); - Tensor *in_tensor = GetTensorFromVar(in_var); + auto *in_var = local_scopes_.at(in_var_handle->scope_idx_) + ->FindVar(in_var_handle->name_); + PADDLE_ENFORCE_NOT_NULL(in_var); + Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var); for (auto *out : out_var_handles) { + if (*out == *in_var_handle) { + continue; + } + auto &out_p = out->place_; - auto out_var = local_scopes_.at(out->scope_idx_)->FindVar(out->name_); + auto *out_var = local_scopes_.at(out->scope_idx_)->FindVar(out->name_); - PADDLE_ENFORCE_EQ(out_p.which(), in_place.which(), + PADDLE_ENFORCE_EQ(out_p.which(), in_var_handle->place_.which(), "Places must be all on CPU or all on CUDA."); - if (in_var->IsType()) { - auto &in_sr = in_var->Get(); - auto out_sr = out_var->GetMutable(); - if (&in_sr == out_sr) continue; - out_sr->set_height(in_sr.height()); - out_sr->set_rows(in_sr.rows()); - out_sr->mutable_value()->Resize(in_sr.value().dims()); - out_sr->mutable_value()->mutable_data(out_p, in_sr.value().type()); - } else if (in_var->IsType()) { - auto in_lod = in_var->Get(); - auto out_lod = out_var->GetMutable(); - if (&in_lod == out_lod) continue; - out_lod->set_lod(in_lod.lod()); - out_lod->Resize(in_lod.dims()); - out_lod->mutable_data(out_p, in_lod.type()); - } else { - PADDLE_THROW("Var should be LoDTensor or SelectedRows."); - } + VariableVisitor::ShareDimsAndLoD(*in_var, out_var); + VariableVisitor::GetMutableTensor(out_var) + .Resize(in_tensor.dims()) + .mutable_data(out_p, in_tensor.type()); - Tensor *out_tensor = GetTensorFromVar(out_var); - paddle::framework::TensorCopy(*in_tensor, out_p, *(dev_ctxes_[in_place]), - out_tensor); + auto dev_ctx = dev_ctxes_[out_p]; + RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] { + paddle::framework::TensorCopy( + in_tensor, out_p, *(dev_ctx), + &VariableVisitor::GetMutableTensor(out_var)); + }); + } +} + +void BroadcastOpHandle::WaitInputVarGenerated(const VarHandle &in_var) { + if (in_var.generated_op_) { + for (auto &pair : dev_ctxes_) { + in_var.generated_op_->Wait(pair.second); + } } } diff --git a/paddle/fluid/framework/details/broadcast_op_handle.h b/paddle/fluid/framework/details/broadcast_op_handle.h index bc3e373488c9899e6e6d46d090b083332ff40666..92420f10ac5972b7924d83b43bb28234079e5072 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.h +++ b/paddle/fluid/framework/details/broadcast_op_handle.h @@ -39,12 +39,12 @@ struct BroadcastOpHandle : public OpHandleBase { protected: void RunImpl() override; + void WaitInputVarGenerated(const VarHandle &in_var); private: const std::vector &local_scopes_; const std::vector &places_; }; - } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/container_cast.h b/paddle/fluid/framework/details/container_cast.h new file mode 100644 index 0000000000000000000000000000000000000000..a42ae78dc45c2a885f98315a21f1d5558725bca3 --- /dev/null +++ b/paddle/fluid/framework/details/container_cast.h @@ -0,0 +1,40 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace paddle { +namespace framework { +namespace details { + +template +std::vector DynamicCast(const std::vector& container) { + static_assert(std::is_base_of::value, + "ElementType must be a base class of ResultType"); + std::vector res; + for (auto* ptr : container) { + auto* derived = dynamic_cast(ptr); + if (derived) { + res.emplace_back(derived); + } + } + return res; +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/gather_op_handle.cc b/paddle/fluid/framework/details/gather_op_handle.cc index 8dd85be567d33991ac003707fec939a61a2d0962..511fd941dc7270d79f9a565f03d233b6fdf41d37 100644 --- a/paddle/fluid/framework/details/gather_op_handle.cc +++ b/paddle/fluid/framework/details/gather_op_handle.cc @@ -13,6 +13,8 @@ // limitations under the License. #include "paddle/fluid/framework/details/gather_op_handle.h" +#include "paddle/fluid/framework/details/container_cast.h" +#include "paddle/fluid/framework/details/variable_visitor.h" namespace paddle { namespace framework { @@ -23,30 +25,23 @@ GatherOpHandle::GatherOpHandle(const std::vector &local_scopes, : local_scopes_(local_scopes), places_(places) {} void GatherOpHandle::RunImpl() { - // the input may have dummy var. - std::vector in_var_handles; - for (auto *in : inputs_) { - auto *in_handle = dynamic_cast(in); - if (in_handle) { - in_var_handles.push_back(in_handle); - } - } + // the input and output may have dummy var. + auto in_var_handles = DynamicCast(inputs_); + PADDLE_ENFORCE_EQ( in_var_handles.size(), places_.size(), "The number of output should equal to the number of places."); - // the output may have dummy var. - std::vector out_var_handles; - for (auto *out : outputs_) { - auto *out_handle = dynamic_cast(out); - if (out_handle) { - out_var_handles.push_back(out_handle); - } + VarHandle *out_var_handle; + { + auto out_var_handles = DynamicCast(outputs_); + + PADDLE_ENFORCE_EQ(out_var_handles.size(), 1, + "The number of output should be one."); + out_var_handle = out_var_handles.front(); } - PADDLE_ENFORCE_EQ(out_var_handles.size(), 1, - "The number of output should be one."); - auto in_0_handle = static_cast(in_var_handles[0]); + auto in_0_handle = in_var_handles[0]; auto pre_in_var = local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_); auto pre_place = in_0_handle->place_; @@ -54,15 +49,11 @@ void GatherOpHandle::RunImpl() { PADDLE_ENFORCE(pre_in_var->IsType(), "Currently, gather_op only can gather SelectedRows."); - PADDLE_ENFORCE_EQ(out_var_handles[0]->place_.which(), pre_place.which(), + PADDLE_ENFORCE_EQ(out_var_handle->place_.which(), pre_place.which(), "The place of input and output should be the same."); // Wait input done, this Wait is asynchronous operation - for (auto *in : in_var_handles) { - if (in->generated_op_) { - in->generated_op_->Wait(dev_ctxes_[in->place_]); - } - } + WaitInputVarGenerated(in_var_handles); std::vector out_rows; std::vector in_tensors; @@ -70,13 +61,12 @@ void GatherOpHandle::RunImpl() { auto &pre_in = pre_in_var->Get(); // gather the inputs - for (auto *in : in_var_handles) { - auto in_handle = static_cast(in); + for (auto *in_handle : in_var_handles) { auto in_p = in_handle->place_; in_places.push_back(in_p); PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(), "Places must be all on CPU or all on CUDA."); - auto in_var = + auto *in_var = local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_); auto &in_sr = in_var->Get(); @@ -84,20 +74,19 @@ void GatherOpHandle::RunImpl() { "The type of input is not consistent."); PADDLE_ENFORCE_EQ(pre_in.height(), in_sr.height(), "The height of inputs is not consistent."); - PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(), , + PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(), "The dims of inputs is not consistent."); - auto in_sr_rows = in_sr.rows(); + auto &in_sr_rows = in_sr.rows(); out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end()); in_tensors.emplace_back(in_sr.value()); } // write the output - auto &out_place = out_var_handles[0]->place_; - auto out_scope_idx = out_var_handles[0]->scope_idx_; - auto out_var = - local_scopes_[out_scope_idx]->FindVar(out_var_handles[0]->name_); + auto &out_place = out_var_handle->place_; + auto out_scope_idx = out_var_handle->scope_idx_; + auto out_var = local_scopes_[out_scope_idx]->FindVar(out_var_handle->name_); auto out = out_var->GetMutable(); out->set_height(pre_in.height()); @@ -110,13 +99,27 @@ void GatherOpHandle::RunImpl() { Tensor *out_tensor = out->mutable_value(); // copy - int s = 0, e = 0; - for (size_t j = 0; j < in_tensors.size(); ++j) { - e += in_tensors[j].dims()[0]; - auto sub_out = out_tensor->Slice(s, e); - paddle::framework::TensorCopy(in_tensors[j], out_place, - *(dev_ctxes_[in_places[j]]), &sub_out); - s = e; + auto dev_ctx = dev_ctxes_[out_place]; + RunAndRecordEvent(out_place, [in_tensors, out_tensor, dev_ctx, out_place] { + int s = 0, e = 0; + for (size_t j = 0; j < in_tensors.size(); ++j) { + e += in_tensors[j].dims()[0]; + auto sub_out = out_tensor->Slice(s, e); + paddle::framework::TensorCopy(in_tensors[j], out_place, *(dev_ctx), + &sub_out); + s = e; + } + }); +} + +void GatherOpHandle::WaitInputVarGenerated( + const std::vector &in_var_handles) { + for (auto *in : in_var_handles) { + if (in->generated_op_) { + for (auto pair : dev_ctxes_) { + in->generated_op_->Wait(pair.second); + } + } } } diff --git a/paddle/fluid/framework/details/gather_op_handle.h b/paddle/fluid/framework/details/gather_op_handle.h index d11ef8556aa8840949ca8dc7aa176413f70b9f22..c394dd7a14b07cb956aa1aedfc0df4fa25744dd7 100644 --- a/paddle/fluid/framework/details/gather_op_handle.h +++ b/paddle/fluid/framework/details/gather_op_handle.h @@ -39,6 +39,7 @@ struct GatherOpHandle : public OpHandleBase { protected: void RunImpl() override; + void WaitInputVarGenerated(const std::vector &in_var_handles); private: const std::vector &local_scopes_; diff --git a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc index e587210b357ea6caa3272903d8aa6b3e4b2e8228..28f9139987faa3dfee1e7733fb846a4d4efadc7b 100644 --- a/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/nccl_all_reduce_op_handle.cc @@ -13,8 +13,8 @@ // limitations under the License. #include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h" - #include +#include "paddle/fluid/framework/details/reduce_and_gather.h" namespace paddle { namespace framework { @@ -29,32 +29,6 @@ NCCLAllReduceOpHandle::NCCLAllReduceOpHandle( } } -struct ReduceLoDTensor { - const std::vector &src_tensors_; - LoDTensor &dst_tensor_; - - ReduceLoDTensor(const std::vector &src, LoDTensor *dst) - : src_tensors_(src), dst_tensor_(*dst) {} - - template - void operator()() const { - PADDLE_ENFORCE(!src_tensors_.empty()); - auto &t0 = src_tensors_[0]; - PADDLE_ENFORCE_NE(t0.numel(), 0); - dst_tensor_.Resize(t0.dims()); - T *dst = dst_tensor_.mutable_data(platform::CPUPlace()); - std::copy(t0.data(), t0.data() + t0.numel(), dst); - - for (size_t i = 1; i < src_tensors_.size(); ++i) { - auto &t = src_tensors_[i]; - PADDLE_ENFORCE_EQ(t.dims(), t0.dims()); - PADDLE_ENFORCE_EQ(t.type(), t0.type()); - std::transform(t.data(), t.data() + t.numel(), dst, dst, - [](T a, T b) -> T { return a + b; }); - } - } -}; - void NCCLAllReduceOpHandle::RunImpl() { if (inputs_.size() == 1) { return; // No need to all reduce when GPU count = 1; diff --git a/paddle/fluid/framework/details/reduce_and_gather.h b/paddle/fluid/framework/details/reduce_and_gather.h new file mode 100644 index 0000000000000000000000000000000000000000..7957fba8a449f7dc05588fad335df0b45a34b575 --- /dev/null +++ b/paddle/fluid/framework/details/reduce_and_gather.h @@ -0,0 +1,94 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include "paddle/fluid/framework/details/reduce_and_gather.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/selected_rows.h" +namespace paddle { +namespace framework { +namespace details { + +struct ReduceLoDTensor { + const std::vector &src_tensors_; + LoDTensor &dst_tensor_; + + ReduceLoDTensor(const std::vector &src, LoDTensor *dst) + : src_tensors_(src), dst_tensor_(*dst) {} + + template + void operator()() const { + PADDLE_ENFORCE(!src_tensors_.empty()); + auto &t0 = src_tensors_[0]; + PADDLE_ENFORCE_NE(t0.numel(), 0); + dst_tensor_.Resize(t0.dims()); + T *dst = dst_tensor_.mutable_data(platform::CPUPlace()); + std::copy(t0.data(), t0.data() + t0.numel(), dst); + + for (size_t i = 1; i < src_tensors_.size(); ++i) { + auto &t = src_tensors_[i]; + PADDLE_ENFORCE_EQ(t.dims(), t0.dims()); + PADDLE_ENFORCE_EQ(t.type(), t0.type()); + std::transform(t.data(), t.data() + t.numel(), dst, dst, + [](T a, T b) -> T { return a + b; }); + } + } +}; + +inline void GatherSelectedRows( + const std::vector &src_selecte_rows_, + const std::vector &in_places, + const std::unordered_map &dev_ctxes, + const platform::Place &out_place, SelectedRows *dst_selecte_rows) { + PADDLE_ENFORCE(!src_selecte_rows_.empty()); + + std::vector in_tensors; + std::vector out_rows; + + for (auto in_sr_ptr : src_selecte_rows_) { + auto &in_sr = *in_sr_ptr; + in_tensors.emplace_back(in_sr.value()); + out_rows.insert(out_rows.end(), in_sr.rows().begin(), in_sr.rows().end()); + } + + auto &pre_in = src_selecte_rows_[0]; + + auto &dst_tensor = *dst_selecte_rows; + dst_tensor.set_height(pre_in->height()); + dst_tensor.set_rows(out_rows); + size_t rows = out_rows.size(); + DDim out_dim = pre_in->GetCompleteDims(); + out_dim[0] = static_cast(rows); + dst_tensor.mutable_value()->Resize(out_dim); + dst_tensor.mutable_value()->mutable_data(out_place, pre_in->value().type()); + Tensor *out_tensor = dst_tensor.mutable_value(); + + // copy + int s = 0, e = 0; + for (size_t j = 0; j < in_tensors.size(); ++j) { + e += in_tensors[j].dims()[0]; + auto sub_out = out_tensor->Slice(s, e); + paddle::framework::TensorCopy(in_tensors[j], out_place, + *(dev_ctxes.at(in_places[j])), &sub_out); + s = e; + } +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/reduce_op_handle.cc b/paddle/fluid/framework/details/reduce_op_handle.cc new file mode 100644 index 0000000000000000000000000000000000000000..c951de5dd5a7b66ce03c705e9bdcbe3f5c3e565d --- /dev/null +++ b/paddle/fluid/framework/details/reduce_op_handle.cc @@ -0,0 +1,161 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/reduce_op_handle.h" +#include "paddle/fluid/framework/details/reduce_and_gather.h" + +namespace paddle { +namespace framework { +namespace details { + +void ReduceOpHandle::RunImpl() { + // the input and output may have dummy var. + std::vector in_var_handles = GetValidVarHandles(inputs_); + std::vector out_var_handles = GetValidVarHandles(outputs_); + + PADDLE_ENFORCE_EQ( + in_var_handles.size(), places_.size(), + "The number of output should equal to the number of places."); + PADDLE_ENFORCE_EQ(out_var_handles.size(), 1, + "The number of output should be one."); + + // Wait input done, this Wait is asynchronous operation + WaitEvents(in_var_handles); + + // check in the same place + auto in_0_handle = in_var_handles[0]; + auto pre_place = in_0_handle->place_; + + std::vector in_places; + for (auto *in_handle : in_var_handles) { + auto in_p = in_handle->place_; + PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(), + "Places must be all on CPU or all on CUDA."); + in_places.emplace_back(in_p); + } + + auto out_var = local_scopes_[out_var_handles[0]->scope_idx_]->FindVar( + out_var_handles[0]->name_); + + auto pre_in_var = + local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_); + + if (pre_in_var->IsType()) { + auto &pre_in = pre_in_var->Get(); + std::vector in_selected_rows; + + for (auto *in_handle : in_var_handles) { + auto in_var = + local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_); + auto &in_sr = in_var->Get(); + + PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(), + "The type of input is not consistent."); + + in_selected_rows.emplace_back(&in_sr); + } + auto trg = out_var->GetMutable(); + GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_, + out_var_handles[0]->place_, trg); + } else { + auto pre_in = pre_in_var->Get(); + std::vector lod_tensors; + + // can be refined + for (auto *in_handle : in_var_handles) { + auto in_var = + local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_); + auto &in_sr = in_var->Get(); + + PADDLE_ENFORCE_EQ(in_sr.type(), pre_in.type(), + "The type of input is not consistent."); + + lod_tensors.emplace_back(in_sr); + } + + auto trg = out_var->GetMutable(); + trg->Resize(pre_in.dims()); + trg->mutable_data(out_var_handles[0]->place_, pre_in.type()); + + if (paddle::platform::is_cpu_place(pre_place)) { + ReduceLoDTensor func(lod_tensors, trg); + VisitDataType(ToDataType(lod_tensors[0].type()), func); + } else if (paddle::platform::is_gpu_place(pre_place)) { +#ifdef PADDLE_WITH_CUDA + auto out_p = out_var_handles[0]->place_; + int root = boost::get(out_p).device; + + std::vector> all_reduce_calls; + for (size_t i = 0; i < local_scopes_.size(); ++i) { + auto &p = in_places[i]; + auto &lod_tensor = lod_tensors[i]; + + int dev_id = boost::get(p).device; + auto &nccl_ctx = nccl_ctxs_->at(dev_id); + auto stream = nccl_ctx.stream(); + auto comm = nccl_ctx.comm_; + + void *buffer = const_cast(lod_tensor.data()); + void *recvbuffer = nullptr; + if (root == dev_id) { + recvbuffer = trg->mutable_data(out_var_handles[0]->place_); + } + + all_reduce_calls.emplace_back([=] { + PADDLE_ENFORCE(platform::dynload::ncclReduce( + buffer, recvbuffer, static_cast(lod_tensor.numel()), + platform::ToNCCLDataType(lod_tensor.type()), ncclSum, root, comm, + stream)); + }); + } + + this->RunAndRecordEvent([&] { + platform::NCCLGroupGuard guard; + for (auto &call : all_reduce_calls) { + call(); + } + }); +#else + PADDLE_THROW("CUDA is not support."); +#endif + } else { + PADDLE_THROW("Place should be CPUPlace or CUDAPlace."); + } + } +} + +void ReduceOpHandle::WaitEvents( + const std::vector &in_var_handles) { + if (in_var_handles[0]->generated_op_) { + for (auto *in : in_var_handles) { + in_var_handles[0]->generated_op_->Wait(dev_ctxes_[in->place_]); + } + } +} + +std::vector ReduceOpHandle::GetValidVarHandles( + const std::vector &inputs) { + std::vector in_var_handles; + for (auto *in : inputs) { + auto *in_handle = dynamic_cast(in); + if (in_handle) { + in_var_handles.push_back(in_handle); + } + } + return in_var_handles; +} +std::string ReduceOpHandle::Name() const { return "reduce"; } +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/reduce_op_handle.h b/paddle/fluid/framework/details/reduce_op_handle.h new file mode 100644 index 0000000000000000000000000000000000000000..7b36ce4a7bceaeb93ceb03730b2d54d0f36fed3d --- /dev/null +++ b/paddle/fluid/framework/details/reduce_op_handle.h @@ -0,0 +1,70 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/details/op_handle_base.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/platform/device_context.h" +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/platform/nccl_helper.h" +#endif + +namespace paddle { +namespace framework { +namespace details { + +struct ReduceOpHandle : public OpHandleBase { + const std::vector &local_scopes_; + const std::vector &places_; + +#ifdef PADDLE_WITH_CUDA + const platform::NCCLContextMap *nccl_ctxs_; + ReduceOpHandle(const std::vector &local_scopes, + const std::vector &places, + const platform::NCCLContextMap *nccl_ctxs) + : local_scopes_(local_scopes), places_(places), nccl_ctxs_(nccl_ctxs) { + if (nccl_ctxs_) { + for (auto &p_ctx : nccl_ctxs_->contexts_) { + dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get(); + } + } + } +#else + ReduceOpHandle(const std::vector &local_scopes, + const std::vector &places) + : local_scopes_(local_scopes), places_(places) {} +#endif + + std::string Name() const override; + + bool IsMultiDeviceTransfer() override { return false; }; + + protected: + void RunImpl() override; + std::vector GetValidVarHandles( + const std::vector &inputs); + + void WaitEvents(const std::vector &in_var_handles); +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/reduce_op_handle_test.cc b/paddle/fluid/framework/details/reduce_op_handle_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..a57c7882ed77ae8cfbf7e284058d94935975828b --- /dev/null +++ b/paddle/fluid/framework/details/reduce_op_handle_test.cc @@ -0,0 +1,275 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/reduce_op_handle.h" +#include "gtest/gtest.h" + +#include "paddle/fluid/platform/device_context.h" + +namespace paddle { +namespace framework { +namespace details { +namespace f = paddle::framework; +namespace p = paddle::platform; + +// test data amount +const f::DDim kDims = {20, 20}; + +struct TestReduceOpHandle { + bool use_gpu_; + Scope g_scope_; + std::vector local_scopes_; + std::unique_ptr op_handle_; + std::vector> vars_; + std::vector gpu_list_; + std::vector> ctxs_; + +#ifdef PADDLE_WITH_CUDA + std::unique_ptr nccl_ctxs_; +#endif + + void WaitAll() { + for (size_t j = 0; j < ctxs_.size(); ++j) { + ctxs_[j]->Wait(); + } +#ifdef PADDLE_WITH_CUDA + if (nccl_ctxs_) { + nccl_ctxs_->WaitAll(); + } +#endif + } + + void InitCtxOnGpu(bool use_gpu) { + use_gpu_ = use_gpu; + if (use_gpu) { +#ifdef PADDLE_WITH_CUDA + int count = p::GetCUDADeviceCount(); + if (count <= 1) { + LOG(WARNING) << "Cannot test multi-gpu Broadcast, because the CUDA " + "device count is " + << count; + exit(0); + } + for (int i = 0; i < count; ++i) { + auto p = p::CUDAPlace(i); + gpu_list_.push_back(p); + ctxs_.emplace_back(new p::CUDADeviceContext(p)); + } + nccl_ctxs_.reset(new platform::NCCLContextMap(gpu_list_)); +#else + PADDLE_THROW("CUDA is not support."); +#endif + } else { + int count = 8; + for (int i = 0; i < count; ++i) { + auto p = p::CPUPlace(); + gpu_list_.push_back(p); + ctxs_.emplace_back(new p::CPUDeviceContext(p)); + } +#ifdef PADDLE_WITH_CUDA + nccl_ctxs_.reset(nullptr); +#endif + } + } + + void InitReduceOp(size_t input_scope_idx) { + for (size_t j = 0; j < gpu_list_.size(); ++j) { + local_scopes_.push_back(&(g_scope_.NewScope())); + local_scopes_[j]->Var("out"); + } + local_scopes_[input_scope_idx]->Var("input"); + + if (use_gpu_) { +#ifdef PADDLE_WITH_CUDA + op_handle_.reset( + new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get())); +#else + PADDLE_THROW("CUDA is not support."); +#endif + } else { +#ifdef PADDLE_WITH_CUDA + op_handle_.reset( + new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get())); +#else + op_handle_.reset(new ReduceOpHandle(local_scopes_, gpu_list_)); +#endif + } + + // add input + for (size_t j = 0; j < gpu_list_.size(); ++j) { + if (!use_gpu_) { + op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get()); + } + auto *in_var_handle = new VarHandle(1, j, "input", gpu_list_[j]); + in_var_handle->generated_op_ = nullptr; + vars_.emplace_back(in_var_handle); + op_handle_->AddInput(in_var_handle); + } + + // add dummy var + vars_.emplace_back(new DummyVarHandle()); + DummyVarHandle *in_dummy_var_handle = + static_cast(vars_.back().get()); + in_dummy_var_handle->generated_op_ = nullptr; + op_handle_->AddInput(in_dummy_var_handle); + + // add output + auto *out_var_handle = + new VarHandle(2, input_scope_idx, "out", gpu_list_[input_scope_idx]); + vars_.emplace_back(out_var_handle); + op_handle_->AddOutput(out_var_handle); + + // add dummy var + vars_.emplace_back(new DummyVarHandle()); + DummyVarHandle *dummy_var_handle = + static_cast(vars_.back().get()); + op_handle_->AddOutput(dummy_var_handle); + } + + void TestReduceSelectedRows(size_t output_scope_idx) { + int height = kDims[0] * 2; + std::vector rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1, + 2, 4, 6, 3, 1, 1, 1, 1, 3, 7}; + std::vector send_vector(f::product(kDims)); + for (size_t k = 0; k < send_vector.size(); ++k) { + send_vector[k] = k; + } + + for (size_t input_scope_idx = 0; input_scope_idx < gpu_list_.size(); + ++input_scope_idx) { + auto in_var = local_scopes_[input_scope_idx]->Var("input"); + auto in_selected_rows = in_var->GetMutable(); + auto value = in_selected_rows->mutable_value(); + value->mutable_data(kDims, gpu_list_[input_scope_idx]); + + in_selected_rows->set_height(height); + in_selected_rows->set_rows(rows); + + paddle::framework::TensorFromVector( + send_vector, *(ctxs_[input_scope_idx]), value); + value->Resize(kDims); + } + + auto out_var = local_scopes_[output_scope_idx]->Var("out"); + auto out_selected_rows = out_var->GetMutable(); + + auto in_var = local_scopes_[output_scope_idx]->Var("input"); + auto in_selected_rows = in_var->GetMutable(); + + out_selected_rows->mutable_value()->ShareDataWith( + in_selected_rows->value()); + + op_handle_->Run(false); + + WaitAll(); + + p::CPUPlace cpu_place; + + auto &out_select_rows = out_var->Get(); + auto rt = out_select_rows.value(); + + PADDLE_ENFORCE_EQ(out_select_rows.height(), height, "height is not equal."); + for (size_t k = 0; k < out_select_rows.rows().size(); ++k) { + PADDLE_ENFORCE_EQ(out_select_rows.rows()[k], rows[k % rows.size()]); + } + + f::Tensor result_tensor; + f::TensorCopy(rt, cpu_place, *(ctxs_[output_scope_idx]), &result_tensor); + float *ct = result_tensor.data(); + + for (int64_t j = 0; j < f::product(result_tensor.dims()); ++j) { + ASSERT_NEAR(ct[j], send_vector[j % send_vector.size()], 1e-5); + } + } + + void TestReduceLodTensors(size_t output_scope_idx) { + std::vector send_vector(static_cast(f::product(kDims))); + for (size_t k = 0; k < send_vector.size(); ++k) { + send_vector[k] = k; + } + f::LoD lod{{0, 10, 20}}; + + for (size_t input_scope_idx = 0; input_scope_idx < gpu_list_.size(); + ++input_scope_idx) { + auto in_var = local_scopes_[input_scope_idx]->Var("input"); + auto in_lod_tensor = in_var->GetMutable(); + in_lod_tensor->mutable_data(kDims, gpu_list_[input_scope_idx]); + in_lod_tensor->set_lod(lod); + + paddle::framework::TensorFromVector( + send_vector, *(ctxs_[input_scope_idx]), in_lod_tensor); + } + + auto out_var = local_scopes_[output_scope_idx]->Var("out"); + auto out_lodtensor = out_var->GetMutable(); + + auto in_var = local_scopes_[output_scope_idx]->Var("input"); + auto in_lodtensor = in_var->Get(); + + out_lodtensor->ShareDataWith(in_lodtensor); + + op_handle_->Run(false); + + WaitAll(); + + p::CPUPlace cpu_place; + + auto &rt = out_var->Get(); + + f::Tensor result_tensor; + f::TensorCopy(rt, cpu_place, *(ctxs_[output_scope_idx]), &result_tensor); + float *ct = result_tensor.data(); + + for (int64_t j = 0; j < f::product(result_tensor.dims()); ++j) { + ASSERT_NEAR(ct[j], send_vector[j] * gpu_list_.size(), 1e-5); + } + } +}; + +TEST(ReduceTester, TestCPUReduceTestSelectedRows) { + TestReduceOpHandle test_op; + size_t input_scope_idx = 0; + test_op.InitCtxOnGpu(false); + test_op.InitReduceOp(input_scope_idx); + test_op.TestReduceSelectedRows(input_scope_idx); +} +TEST(ReduceTester, TestCPUReduceTestLodTensor) { + TestReduceOpHandle test_op; + size_t input_scope_idx = 0; + test_op.InitCtxOnGpu(false); + test_op.InitReduceOp(input_scope_idx); + test_op.TestReduceLodTensors(input_scope_idx); +} +#ifdef PADDLE_WITH_CUDA + +TEST(ReduceTester, TestGPUReduceTestSelectedRows) { + TestReduceOpHandle test_op; + size_t input_scope_idx = 0; + test_op.InitCtxOnGpu(true); + test_op.InitReduceOp(input_scope_idx); + test_op.TestReduceSelectedRows(input_scope_idx); +} + +TEST(ReduceTester, TestGPUReduceTestLodTensor) { + TestReduceOpHandle test_op; + size_t input_scope_idx = 0; + test_op.InitCtxOnGpu(true); + test_op.InitReduceOp(input_scope_idx); + test_op.TestReduceLodTensors(input_scope_idx); +} +#endif + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/var_handle.h b/paddle/fluid/framework/details/var_handle.h index 2b887c67e6fc6ea78e42fbb9fd170f740db27d97..9f7fd69e64fe9d7ef0bf3037bea7f686cb2eee0b 100644 --- a/paddle/fluid/framework/details/var_handle.h +++ b/paddle/fluid/framework/details/var_handle.h @@ -61,6 +61,11 @@ struct VarHandle : public VarHandleBase { size_t scope_idx_; std::string name_; platform::Place place_; + + bool operator==(const VarHandle& o) const { + return o.generated_op_ == generated_op_ && o.name_ == name_ && + o.scope_idx_ == scope_idx_; + } }; // Dummy Variable. It is used to represent dependencies between operators diff --git a/paddle/fluid/framework/details/variable_visitor.cc b/paddle/fluid/framework/details/variable_visitor.cc new file mode 100644 index 0000000000000000000000000000000000000000..10bac0fae9504215fab11dd8cca7c278feaa4bda --- /dev/null +++ b/paddle/fluid/framework/details/variable_visitor.cc @@ -0,0 +1,93 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/variable_visitor.h" +#include "paddle/fluid/framework/selected_rows.h" +namespace paddle { +namespace framework { +namespace details { +template +static void VisitVariable(Variable* var, Func* func) { + if (var->IsType()) { + (*func)(var->GetMutable()); + } else if (var->IsType()) { + (*func)(var->GetMutable()); + } else { + PADDLE_THROW("Not supported type %s", var->Type().name()); + } +} + +template +static void VisitVariable(const Variable& var, Func* func) { + if (var.IsType()) { + (*func)(var.Get()); + } else if (var.IsType()) { + (*func)(var.Get()); + } else { + PADDLE_THROW("Not supported type %s", var.Type().name()); + } +} + +struct TensorVisitor { + Tensor* result_{nullptr}; + + void operator()(LoDTensor* tensor) { result_ = tensor; } + + void operator()(SelectedRows* selected_rows) { + result_ = selected_rows->mutable_value(); + } + + template + void operator()() { + PADDLE_THROW("Not Support to get LoDTensor from %s", typeid(T).name()); + } +}; + +Tensor& VariableVisitor::GetMutableTensor(Variable* var) { + TensorVisitor vistor; + VisitVariable(var, &vistor); + return *vistor.result_; +} + +struct ShareDimsAndLoDVisitor { + Variable* trg_; + void operator()(const LoDTensor& val) { + auto* tensor = trg_->GetMutable(); + tensor->set_layout(val.layout()); + tensor->set_lod(val.lod()); + tensor->Resize(val.dims()); + } + + void operator()(const SelectedRows& val) { + auto* selected_rows = trg_->GetMutable(); + selected_rows->set_rows(val.rows()); + selected_rows->set_height(val.height()); + selected_rows->mutable_value()->Resize(val.value().dims()); + } + + template + void operator()(const T&) { + PADDLE_ENFORCE("ShareDimsAndLoD is not supported by type %s", + typeid(T).name()); + } +}; + +void VariableVisitor::ShareDimsAndLoD(const Variable& src, Variable* trg) { + ShareDimsAndLoDVisitor visitor{trg}; + VisitVariable(src, &visitor); +} + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/variable_visitor.h b/paddle/fluid/framework/details/variable_visitor.h new file mode 100644 index 0000000000000000000000000000000000000000..67baa1895e4513738fa73d49c46660da92279b9d --- /dev/null +++ b/paddle/fluid/framework/details/variable_visitor.h @@ -0,0 +1,33 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/variable.h" + +namespace paddle { +namespace framework { +namespace details { + +class VariableVisitor { + public: + static Tensor &GetMutableTensor(Variable *var); + + static void ShareDimsAndLoD(const Variable &src, Variable *trg); +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/scope.cc b/paddle/fluid/framework/scope.cc index f9866411417ece784aab860c6f707b1a1fcd8528..9091713158c8071d5386f14250e3c546284e7fd0 100644 --- a/paddle/fluid/framework/scope.cc +++ b/paddle/fluid/framework/scope.cc @@ -26,6 +26,11 @@ DEFINE_bool(benchmark, false, "Default cuda is asynchronous device, set to True will" "force op run in synchronous mode."); +DEFINE_bool( + eager_delete_scope, true, + "Delete local scope eagerly. It will reduce GPU memory usage but " + "slow down the destruction of variables.(around 1% performance harm)"); + namespace paddle { namespace framework { @@ -97,7 +102,7 @@ void Scope::DeleteScope(Scope* scope) const { PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope); this->kids_.erase(it); // When making memory benchmark on Fluid, we have to delete scope sync. - if (FLAGS_benchmark) { + if (FLAGS_benchmark || FLAGS_eager_delete_scope) { delete scope; } else { Async([scope] { delete scope; }); diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index b6110f92ed4f38a156e0c99ecfb399f3f47a169e..452ff5e967c086340e065a1b6a4b8672c75a4a3d 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -59,15 +59,13 @@ class AsyncGRPCServer final { void SetProgram(framework::ProgramDesc *program) { program_ = program; } - void SetPrefetchBlkdId(int blkid) { prefetch_blk_id_ = blkid; } - void SetExecutor(framework::Executor *executor) { executor_ = executor; } void SetPrefetchPreparedCtx(framework::ExecutorPrepareContext *prepared) { prefetch_ctx_ = prepared; } - int GetSelectedPort() { return selected_port_; } + int GetSelectedPort() const { return selected_port_; } const ReceivedMessage Get() { return this->var_recv_queue_.Pop(); } @@ -114,7 +112,6 @@ class AsyncGRPCServer final { std::unique_ptr t_get_; std::unique_ptr t_prefetch_; - int prefetch_blk_id_; framework::ExecutorPrepareContext *prefetch_ctx_; framework::ProgramDesc *program_; framework::Executor *executor_; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index a4c925b538ef916e88ec06cea6de57f31eaf069b..49e191b7bb9e97c91ae2e536550aebd6e91c6fce 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -27,20 +27,6 @@ void RunServer(std::shared_ptr service) { VLOG(4) << "RunServer thread end"; } -static void CreateTensorFromMessageType(framework::Variable *var, - sendrecv::VarType var_type) { - if (var_type == sendrecv::VarType::LOD_TENSOR) { - var->GetMutable(); - } else if (var_type == sendrecv::VarType::SELECTED_ROWS) { - var->GetMutable(); - } else { - PADDLE_THROW( - "VariableMessage type %d is not in " - "[LoDTensor, SelectedRows]", - var_type); - } -} - static void ParallelExecuteBlocks( const std::vector ¶llel_blkids, framework::Executor *executor, const std::vector> @@ -62,6 +48,13 @@ static void ParallelExecuteBlocks( for (size_t i = 0; i < fs.size(); ++i) fs[i].wait(); } +static void SavePort(std::shared_ptr rpc_service) { + std::ofstream port_file; + port_file.open("/tmp/paddle.selected_port"); + port_file << rpc_service->GetSelectedPort(); + port_file.close(); +} + ListenAndServOp::ListenAndServOp(const std::string &type, const framework::VariableNameMap &inputs, const framework::VariableNameMap &outputs, @@ -77,59 +70,25 @@ void ListenAndServOp::Stop() { server_thread_->join(); } -void ListenAndServOp::RunImpl(const framework::Scope &scope, - const platform::Place &dev_place) const { - platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); - auto &dev_ctx = *pool.Get(dev_place); - framework::Scope &recv_scope = scope.NewScope(); - - if (!rpc_service_) { - std::string endpoint = Attr("endpoint"); - rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); - } - - auto ins = Inputs("X"); +void ListenAndServOp::RunSyncUpdate( + framework::Executor *executor, framework::ProgramDesc *program, + framework::Scope *recv_scope, framework::BlockDesc *prefetch_block) const { auto fan_in = Attr("Fanin"); - auto *optimize_block = Attr(kOptimizeBlock); - auto *prefetch_block = Attr(kPrefetchBlock); - auto *program = optimize_block->Program(); + size_t num_blocks = program->Size(); PADDLE_ENFORCE_GE(num_blocks, 2, "server program should have at least 2 blocks"); - framework::Executor executor(dev_place); std::vector block_list; for (size_t blkid = 1; blkid < num_blocks; ++blkid) { - if (blkid != static_cast(prefetch_block->ID())) { - block_list.push_back(blkid); - } + block_list.push_back(blkid); } - auto optimize_prepared = executor.Prepare(*program, block_list); + auto optimize_prepared = executor->Prepare(*program, block_list); // Insert placeholder for block0 which holds current op itself. optimize_prepared.insert( optimize_prepared.begin(), std::shared_ptr(nullptr)); - rpc_service_->SetScope(&recv_scope); - rpc_service_->SetDevCtx(&dev_ctx); - // TODO(qiao) set proper fields for table lookup and update - rpc_service_->SetExecutor(&executor); - VLOG(3) << "prefetch block id is " << prefetch_block->ID(); - auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); - rpc_service_->SetPrefetchBlkdId(prefetch_block->ID()); - rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get()); - prefetch_prepared.release(); - rpc_service_->SetProgram(program); - // start the server listening after all member initialized. - server_thread_.reset(new std::thread(RunServer, rpc_service_)); - VLOG(3) << "wait server thread to become ready..."; - sleep(5); - // Write to a file of server selected port for python use. - std::ofstream port_file; - port_file.open("/tmp/paddle.selected_port"); - port_file << rpc_service_->GetSelectedPort(); - port_file.close(); - bool exit_flag = false; // Record received sparse variables, so that // we could reset those after execute optimize program @@ -170,7 +129,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, break; } - // NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads + // NOTE: if is_gpu_place, CUDA kernels are launched by multiple threads // and this will still work. // The optimize blocks which have the same parent ID would run parallel @@ -182,16 +141,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, for (size_t blkid = 2; blkid < num_blocks; ++blkid) { if (blkid != static_cast(prefetch_block->ID())) { if (program->Block(blkid).Parent() != last_parent_blkid) { - ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, - program, &recv_scope); + ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, + program, recv_scope); parallel_blkids.clear(); last_parent_blkid = program->Block(blkid).Parent(); } parallel_blkids.push_back(blkid); } } - ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, - program, &recv_scope); + ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program, + recv_scope); VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)"; // Reset the received sparse variables, the sum operator would not @@ -209,6 +168,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, } // while(true) } +void ListenAndServOp::RunImpl(const framework::Scope &scope, + const platform::Place &dev_place) const { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &dev_ctx = *pool.Get(dev_place); + framework::Scope &recv_scope = scope.NewScope(); + + PADDLE_ENFORCE(!rpc_service_); + std::string endpoint = Attr("endpoint"); + rpc_service_.reset(new detail::AsyncGRPCServer(endpoint)); + + auto *optimize_block = Attr(kOptimizeBlock); + auto *prefetch_block = Attr(kPrefetchBlock); + auto *program = optimize_block->Program(); + framework::Executor executor(dev_place); + + // prepare rpc_service + rpc_service_->SetScope(&recv_scope); + rpc_service_->SetDevCtx(&dev_ctx); + rpc_service_->SetProgram(program); + rpc_service_->SetExecutor(&executor); + + // prepare for prefetch + VLOG(3) << "prefetch block id is " << prefetch_block->ID(); + auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); + rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get()); + prefetch_prepared.release(); + + // start the server listening after all member initialized. + server_thread_.reset(new std::thread(RunServer, rpc_service_)); + VLOG(3) << "wait server thread to become ready..."; + sleep(5); + // Write to a file of server selected port for python use. + SavePort(rpc_service_); + RunSyncUpdate(&executor, program, &recv_scope, prefetch_block); +} + class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { public: ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker) diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 9744921cef7c0f13c94b7fe729561de8e181650c..68c8da4d382221f662932789e67ce2aacf18e1a3 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -34,17 +34,22 @@ void RunServer(std::shared_ptr service); class ListenAndServOp : public framework::OperatorBase { public: - ListenAndServOp(const std::string &type, - const framework::VariableNameMap &inputs, - const framework::VariableNameMap &outputs, - const framework::AttributeMap &attrs); + ListenAndServOp(const std::string& type, + const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs); int GetSelectedPort() const; + void RunSyncUpdate(framework::Executor* executor, + framework::ProgramDesc* program, + framework::Scope* recv_scope, + framework::BlockDesc* prefetch_block) const; + void Stop() override; - void RunImpl(const framework::Scope &scope, - const platform::Place &dev_place) const override; + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override; protected: mutable std::shared_ptr rpc_service_; diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index a342874f97460cf624ff0047915d33ba4161f19b..81350fee38df058d1b63eb5a8cd0b770e0626ae4 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -127,7 +127,7 @@ void StartServerNet(bool is_sparse) { const auto &root_block = program.Block(0); auto *optimize_block = program.AppendBlock(root_block); auto *prefetch_block = program.AppendBlock(root_block); - // X for server side tensors, RX for received tensers, must be of same shape. + // X for server side tensors, RX for received tensors, must be of same shape. AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block); f::AttributeMap attrs; diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index e9ca0d45f98bd27692a15060310d4e8cd1e8b181..e2502990d5b78eb0db7bdfd0c8ef9fb6688016df 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -107,7 +107,8 @@ def __bootstrap__(): os.environ['OMP_NUM_THREADS'] = str(num_threads) read_env_flags = [ - 'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir' + 'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir', + 'eager_delete_scope' ] if core.is_compiled_with_cuda(): read_env_flags += ['fraction_of_gpu_memory_to_use']