提交 36083018 编写于 作者: Q qiaolongfei

Merge branch 'refine-listen-and-serve-op' of...

Merge branch 'refine-listen-and-serve-op' of ssh://github.com/jacquesqiao/Paddle into add-async-listen-and-serv-op
...@@ -2,8 +2,6 @@ cc_library(var_handle SRCS var_handle.cc DEPS place) ...@@ -2,8 +2,6 @@ cc_library(var_handle SRCS var_handle.cc DEPS place)
cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context lod_tensor) cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context lod_tensor)
cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory)
cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory)
nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda)
cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry) cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry)
cc_library(send_op_handle SRCS send_op_handle.cc DEPS framework_proto scope place operator op_registry) cc_library(send_op_handle SRCS send_op_handle.cc DEPS framework_proto scope place operator op_registry)
...@@ -11,20 +9,29 @@ cc_library(ssa_graph SRCS ssa_graph.cc DEPS var_handle op_handle_base) ...@@ -11,20 +9,29 @@ cc_library(ssa_graph SRCS ssa_graph.cc DEPS var_handle op_handle_base)
cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS ssa_graph) cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS ssa_graph)
if(WITH_GPU) if(WITH_GPU)
nv_library(nccl_all_reduce_op_handle SRCS nccl_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda)
set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle) set(multi_devices_graph_builder_deps nccl_all_reduce_op_handle)
nv_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base scope ddim dynload_cuda)
else() else()
set(multi_devices_graph_builder_deps) set(multi_devices_graph_builder_deps)
cc_library(reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base scope ddim)
endif() endif()
cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle cc_library(multi_devices_graph_builder SRCS multi_devices_graph_builder.cc DEPS ssa_graph_builder computation_op_handle
scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps}) scale_loss_grad_op_handle send_op_handle ${multi_devices_graph_builder_deps})
cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ssa_graph framework_proto)
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
simple_threadpool device_context) simple_threadpool device_context)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base scope ddim memory) cc_library(variable_visitor SRCS variable_visitor.cc DEPS lod_tensor selected_rows)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope ddim memory)
cc_library(broadcast_op_handle SRCS broadcast_op_handle.cc DEPS op_handle_base variable_visitor scope ddim memory)
cc_library(gather_op_handle SRCS gather_op_handle.cc DEPS op_handle_base scope variable_visitor ddim memory)
cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle) device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context gather_op_handle) device_context gather_op_handle)
cc_test(reduce_op_handle_test SRCS reduce_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context reduce_op_handle )
...@@ -13,95 +13,72 @@ ...@@ -13,95 +13,72 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/framework/details/broadcast_op_handle.h" #include "paddle/fluid/framework/details/broadcast_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
Tensor *GetTensorFromVar(Variable *in_var) {
if (in_var->IsType<LoDTensor>()) {
return in_var->GetMutable<LoDTensor>();
} else if (in_var->IsType<SelectedRows>()) {
return in_var->GetMutable<SelectedRows>()->mutable_value();
} else {
PADDLE_THROW("Var should be LoDTensor or SelectedRows");
}
return nullptr;
}
BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes, BroadcastOpHandle::BroadcastOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places) const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {} : local_scopes_(local_scopes), places_(places) {}
void BroadcastOpHandle::RunImpl() { void BroadcastOpHandle::RunImpl() {
// the input may have dummy var. // the input and output may have dummy var.
std::vector<VarHandle *> in_var_handle; VarHandle *in_var_handle;
for (auto *in : inputs_) {
auto *out_handle = dynamic_cast<VarHandle *>(in);
if (out_handle) {
in_var_handle.push_back(out_handle);
}
}
PADDLE_ENFORCE_EQ(in_var_handle.size(), 1,
"The number of input should be one.");
// the output may have dummy var. {
std::vector<VarHandle *> out_var_handles; auto in_var_handles = DynamicCast<VarHandle>(inputs_);
for (auto *out : outputs_) { PADDLE_ENFORCE_EQ(in_var_handles.size(), 1,
auto *out_handle = dynamic_cast<VarHandle *>(out); "The number of input should be one.");
if (out_handle) { in_var_handle = in_var_handles[0];
out_var_handles.push_back(out_handle);
}
} }
auto out_var_handles = DynamicCast<VarHandle>(outputs_);
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
out_var_handles.size(), places_.size(), out_var_handles.size(), places_.size(),
"The number of output should equal to the number of places."); "The number of output should equal to the number of places.");
// Wait input done, this Wait is asynchronous operation // Wait input done, this Wait is asynchronous operation platform::Place
auto &in_place = in_var_handle[0]->place_; // &in_place;
if (in_var_handle[0]->generated_op_) { WaitInputVarGenerated(*in_var_handle);
for (auto *out : out_var_handles) {
auto &out_p = out->place_;
in_var_handle[0]->generated_op_->Wait(dev_ctxes_[out_p]);
}
}
// auto *in_var = local_scopes_.at(in_var_handle->scope_idx_)
auto in_scope_idx = in_var_handle[0]->scope_idx_; ->FindVar(in_var_handle->name_);
auto in_var = PADDLE_ENFORCE_NOT_NULL(in_var);
local_scopes_.at(in_scope_idx)->FindVar(in_var_handle[0]->name_); Tensor &in_tensor = VariableVisitor::GetMutableTensor(in_var);
Tensor *in_tensor = GetTensorFromVar(in_var);
for (auto *out : out_var_handles) { for (auto *out : out_var_handles) {
if (*out == *in_var_handle) {
continue;
}
auto &out_p = out->place_; auto &out_p = out->place_;
auto out_var = local_scopes_.at(out->scope_idx_)->FindVar(out->name_); auto *out_var = local_scopes_.at(out->scope_idx_)->FindVar(out->name_);
PADDLE_ENFORCE_EQ(out_p.which(), in_place.which(), PADDLE_ENFORCE_EQ(out_p.which(), in_var_handle->place_.which(),
"Places must be all on CPU or all on CUDA."); "Places must be all on CPU or all on CUDA.");
if (in_var->IsType<framework::SelectedRows>()) { VariableVisitor::ShareDimsAndLoD(*in_var, out_var);
auto &in_sr = in_var->Get<framework::SelectedRows>(); VariableVisitor::GetMutableTensor(out_var)
auto out_sr = out_var->GetMutable<framework::SelectedRows>(); .Resize(in_tensor.dims())
if (&in_sr == out_sr) continue; .mutable_data(out_p, in_tensor.type());
out_sr->set_height(in_sr.height());
out_sr->set_rows(in_sr.rows()); auto dev_ctx = dev_ctxes_[out_p];
out_sr->mutable_value()->Resize(in_sr.value().dims()); RunAndRecordEvent(out_p, [in_tensor, out_var, dev_ctx, out_p] {
out_sr->mutable_value()->mutable_data(out_p, in_sr.value().type()); paddle::framework::TensorCopy(
} else if (in_var->IsType<framework::LoDTensor>()) { in_tensor, out_p, *(dev_ctx),
auto in_lod = in_var->Get<framework::LoDTensor>(); &VariableVisitor::GetMutableTensor(out_var));
auto out_lod = out_var->GetMutable<framework::LoDTensor>(); });
if (&in_lod == out_lod) continue;
out_lod->set_lod(in_lod.lod());
out_lod->Resize(in_lod.dims());
out_lod->mutable_data(out_p, in_lod.type());
} else {
PADDLE_THROW("Var should be LoDTensor or SelectedRows.");
} }
}
Tensor *out_tensor = GetTensorFromVar(out_var); void BroadcastOpHandle::WaitInputVarGenerated(const VarHandle &in_var) {
paddle::framework::TensorCopy(*in_tensor, out_p, *(dev_ctxes_[in_place]), if (in_var.generated_op_) {
out_tensor); for (auto &pair : dev_ctxes_) {
in_var.generated_op_->Wait(pair.second);
}
} }
} }
......
...@@ -39,12 +39,12 @@ struct BroadcastOpHandle : public OpHandleBase { ...@@ -39,12 +39,12 @@ struct BroadcastOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
void WaitInputVarGenerated(const VarHandle &in_var);
private: private:
const std::vector<Scope *> &local_scopes_; const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_; const std::vector<platform::Place> &places_;
}; };
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <type_traits>
#include <vector>
namespace paddle {
namespace framework {
namespace details {
template <typename ResultType, typename ElemType>
std::vector<ResultType*> DynamicCast(const std::vector<ElemType*>& container) {
static_assert(std::is_base_of<ElemType, ResultType>::value,
"ElementType must be a base class of ResultType");
std::vector<ResultType*> res;
for (auto* ptr : container) {
auto* derived = dynamic_cast<ResultType*>(ptr);
if (derived) {
res.emplace_back(derived);
}
}
return res;
}
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/framework/details/gather_op_handle.h" #include "paddle/fluid/framework/details/gather_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -23,30 +25,23 @@ GatherOpHandle::GatherOpHandle(const std::vector<Scope *> &local_scopes, ...@@ -23,30 +25,23 @@ GatherOpHandle::GatherOpHandle(const std::vector<Scope *> &local_scopes,
: local_scopes_(local_scopes), places_(places) {} : local_scopes_(local_scopes), places_(places) {}
void GatherOpHandle::RunImpl() { void GatherOpHandle::RunImpl() {
// the input may have dummy var. // the input and output may have dummy var.
std::vector<VarHandle *> in_var_handles; auto in_var_handles = DynamicCast<VarHandle>(inputs_);
for (auto *in : inputs_) {
auto *in_handle = dynamic_cast<VarHandle *>(in);
if (in_handle) {
in_var_handles.push_back(in_handle);
}
}
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
in_var_handles.size(), places_.size(), in_var_handles.size(), places_.size(),
"The number of output should equal to the number of places."); "The number of output should equal to the number of places.");
// the output may have dummy var. VarHandle *out_var_handle;
std::vector<VarHandle *> out_var_handles; {
for (auto *out : outputs_) { auto out_var_handles = DynamicCast<VarHandle>(outputs_);
auto *out_handle = dynamic_cast<VarHandle *>(out);
if (out_handle) {
out_var_handles.push_back(out_handle);
}
}
PADDLE_ENFORCE_EQ(out_var_handles.size(), 1, PADDLE_ENFORCE_EQ(out_var_handles.size(), 1,
"The number of output should be one."); "The number of output should be one.");
out_var_handle = out_var_handles.front();
}
auto in_0_handle = static_cast<VarHandle *>(in_var_handles[0]); auto in_0_handle = in_var_handles[0];
auto pre_in_var = auto pre_in_var =
local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_); local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_);
auto pre_place = in_0_handle->place_; auto pre_place = in_0_handle->place_;
...@@ -54,15 +49,11 @@ void GatherOpHandle::RunImpl() { ...@@ -54,15 +49,11 @@ void GatherOpHandle::RunImpl() {
PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(), PADDLE_ENFORCE(pre_in_var->IsType<framework::SelectedRows>(),
"Currently, gather_op only can gather SelectedRows."); "Currently, gather_op only can gather SelectedRows.");
PADDLE_ENFORCE_EQ(out_var_handles[0]->place_.which(), pre_place.which(), PADDLE_ENFORCE_EQ(out_var_handle->place_.which(), pre_place.which(),
"The place of input and output should be the same."); "The place of input and output should be the same.");
// Wait input done, this Wait is asynchronous operation // Wait input done, this Wait is asynchronous operation
for (auto *in : in_var_handles) { WaitInputVarGenerated(in_var_handles);
if (in->generated_op_) {
in->generated_op_->Wait(dev_ctxes_[in->place_]);
}
}
std::vector<int64_t> out_rows; std::vector<int64_t> out_rows;
std::vector<Tensor> in_tensors; std::vector<Tensor> in_tensors;
...@@ -70,13 +61,12 @@ void GatherOpHandle::RunImpl() { ...@@ -70,13 +61,12 @@ void GatherOpHandle::RunImpl() {
auto &pre_in = pre_in_var->Get<framework::SelectedRows>(); auto &pre_in = pre_in_var->Get<framework::SelectedRows>();
// gather the inputs // gather the inputs
for (auto *in : in_var_handles) { for (auto *in_handle : in_var_handles) {
auto in_handle = static_cast<VarHandle *>(in);
auto in_p = in_handle->place_; auto in_p = in_handle->place_;
in_places.push_back(in_p); in_places.push_back(in_p);
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(), PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(),
"Places must be all on CPU or all on CUDA."); "Places must be all on CPU or all on CUDA.");
auto in_var = auto *in_var =
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_); local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::SelectedRows>(); auto &in_sr = in_var->Get<framework::SelectedRows>();
...@@ -84,20 +74,19 @@ void GatherOpHandle::RunImpl() { ...@@ -84,20 +74,19 @@ void GatherOpHandle::RunImpl() {
"The type of input is not consistent."); "The type of input is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.height(), in_sr.height(), PADDLE_ENFORCE_EQ(pre_in.height(), in_sr.height(),
"The height of inputs is not consistent."); "The height of inputs is not consistent.");
PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(), , PADDLE_ENFORCE_EQ(pre_in.GetCompleteDims(), in_sr.GetCompleteDims(),
"The dims of inputs is not consistent."); "The dims of inputs is not consistent.");
auto in_sr_rows = in_sr.rows(); auto &in_sr_rows = in_sr.rows();
out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end()); out_rows.insert(out_rows.end(), in_sr_rows.begin(), in_sr_rows.end());
in_tensors.emplace_back(in_sr.value()); in_tensors.emplace_back(in_sr.value());
} }
// write the output // write the output
auto &out_place = out_var_handles[0]->place_; auto &out_place = out_var_handle->place_;
auto out_scope_idx = out_var_handles[0]->scope_idx_; auto out_scope_idx = out_var_handle->scope_idx_;
auto out_var = auto out_var = local_scopes_[out_scope_idx]->FindVar(out_var_handle->name_);
local_scopes_[out_scope_idx]->FindVar(out_var_handles[0]->name_);
auto out = out_var->GetMutable<framework::SelectedRows>(); auto out = out_var->GetMutable<framework::SelectedRows>();
out->set_height(pre_in.height()); out->set_height(pre_in.height());
...@@ -110,14 +99,28 @@ void GatherOpHandle::RunImpl() { ...@@ -110,14 +99,28 @@ void GatherOpHandle::RunImpl() {
Tensor *out_tensor = out->mutable_value(); Tensor *out_tensor = out->mutable_value();
// copy // copy
auto dev_ctx = dev_ctxes_[out_place];
RunAndRecordEvent(out_place, [in_tensors, out_tensor, dev_ctx, out_place] {
int s = 0, e = 0; int s = 0, e = 0;
for (size_t j = 0; j < in_tensors.size(); ++j) { for (size_t j = 0; j < in_tensors.size(); ++j) {
e += in_tensors[j].dims()[0]; e += in_tensors[j].dims()[0];
auto sub_out = out_tensor->Slice(s, e); auto sub_out = out_tensor->Slice(s, e);
paddle::framework::TensorCopy(in_tensors[j], out_place, paddle::framework::TensorCopy(in_tensors[j], out_place, *(dev_ctx),
*(dev_ctxes_[in_places[j]]), &sub_out); &sub_out);
s = e; s = e;
} }
});
}
void GatherOpHandle::WaitInputVarGenerated(
const std::vector<VarHandle *> &in_var_handles) {
for (auto *in : in_var_handles) {
if (in->generated_op_) {
for (auto pair : dev_ctxes_) {
in->generated_op_->Wait(pair.second);
}
}
}
} }
std::string GatherOpHandle::Name() const { return "gather"; } std::string GatherOpHandle::Name() const { return "gather"; }
......
...@@ -39,6 +39,7 @@ struct GatherOpHandle : public OpHandleBase { ...@@ -39,6 +39,7 @@ struct GatherOpHandle : public OpHandleBase {
protected: protected:
void RunImpl() override; void RunImpl() override;
void WaitInputVarGenerated(const std::vector<VarHandle *> &in_var_handles);
private: private:
const std::vector<Scope *> &local_scopes_; const std::vector<Scope *> &local_scopes_;
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h" #include "paddle/fluid/framework/details/nccl_all_reduce_op_handle.h"
#include <algorithm> #include <algorithm>
#include "paddle/fluid/framework/details/reduce_and_gather.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -29,32 +29,6 @@ NCCLAllReduceOpHandle::NCCLAllReduceOpHandle( ...@@ -29,32 +29,6 @@ NCCLAllReduceOpHandle::NCCLAllReduceOpHandle(
} }
} }
struct ReduceLoDTensor {
const std::vector<LoDTensor> &src_tensors_;
LoDTensor &dst_tensor_;
ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst)
: src_tensors_(src), dst_tensor_(*dst) {}
template <typename T>
void operator()() const {
PADDLE_ENFORCE(!src_tensors_.empty());
auto &t0 = src_tensors_[0];
PADDLE_ENFORCE_NE(t0.numel(), 0);
dst_tensor_.Resize(t0.dims());
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
for (size_t i = 1; i < src_tensors_.size(); ++i) {
auto &t = src_tensors_[i];
PADDLE_ENFORCE_EQ(t.dims(), t0.dims());
PADDLE_ENFORCE_EQ(t.type(), t0.type());
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst,
[](T a, T b) -> T { return a + b; });
}
}
};
void NCCLAllReduceOpHandle::RunImpl() { void NCCLAllReduceOpHandle::RunImpl() {
if (inputs_.size() == 1) { if (inputs_.size() == 1) {
return; // No need to all reduce when GPU count = 1; return; // No need to all reduce when GPU count = 1;
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <algorithm>
#include <map>
#include <vector>
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/selected_rows.h"
namespace paddle {
namespace framework {
namespace details {
struct ReduceLoDTensor {
const std::vector<LoDTensor> &src_tensors_;
LoDTensor &dst_tensor_;
ReduceLoDTensor(const std::vector<LoDTensor> &src, LoDTensor *dst)
: src_tensors_(src), dst_tensor_(*dst) {}
template <typename T>
void operator()() const {
PADDLE_ENFORCE(!src_tensors_.empty());
auto &t0 = src_tensors_[0];
PADDLE_ENFORCE_NE(t0.numel(), 0);
dst_tensor_.Resize(t0.dims());
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
for (size_t i = 1; i < src_tensors_.size(); ++i) {
auto &t = src_tensors_[i];
PADDLE_ENFORCE_EQ(t.dims(), t0.dims());
PADDLE_ENFORCE_EQ(t.type(), t0.type());
std::transform(t.data<T>(), t.data<T>() + t.numel(), dst, dst,
[](T a, T b) -> T { return a + b; });
}
}
};
inline void GatherSelectedRows(
const std::vector<const SelectedRows *> &src_selecte_rows_,
const std::vector<platform::Place> &in_places,
const std::unordered_map<platform::Place, platform::DeviceContext *,
platform::PlaceHash> &dev_ctxes,
const platform::Place &out_place, SelectedRows *dst_selecte_rows) {
PADDLE_ENFORCE(!src_selecte_rows_.empty());
std::vector<Tensor> in_tensors;
std::vector<int64_t> out_rows;
for (auto in_sr_ptr : src_selecte_rows_) {
auto &in_sr = *in_sr_ptr;
in_tensors.emplace_back(in_sr.value());
out_rows.insert(out_rows.end(), in_sr.rows().begin(), in_sr.rows().end());
}
auto &pre_in = src_selecte_rows_[0];
auto &dst_tensor = *dst_selecte_rows;
dst_tensor.set_height(pre_in->height());
dst_tensor.set_rows(out_rows);
size_t rows = out_rows.size();
DDim out_dim = pre_in->GetCompleteDims();
out_dim[0] = static_cast<int64_t>(rows);
dst_tensor.mutable_value()->Resize(out_dim);
dst_tensor.mutable_value()->mutable_data(out_place, pre_in->value().type());
Tensor *out_tensor = dst_tensor.mutable_value();
// copy
int s = 0, e = 0;
for (size_t j = 0; j < in_tensors.size(); ++j) {
e += in_tensors[j].dims()[0];
auto sub_out = out_tensor->Slice(s, e);
paddle::framework::TensorCopy(in_tensors[j], out_place,
*(dev_ctxes.at(in_places[j])), &sub_out);
s = e;
}
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/reduce_op_handle.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
namespace paddle {
namespace framework {
namespace details {
void ReduceOpHandle::RunImpl() {
// the input and output may have dummy var.
std::vector<VarHandle *> in_var_handles = GetValidVarHandles(inputs_);
std::vector<VarHandle *> out_var_handles = GetValidVarHandles(outputs_);
PADDLE_ENFORCE_EQ(
in_var_handles.size(), places_.size(),
"The number of output should equal to the number of places.");
PADDLE_ENFORCE_EQ(out_var_handles.size(), 1,
"The number of output should be one.");
// Wait input done, this Wait is asynchronous operation
WaitEvents(in_var_handles);
// check in the same place
auto in_0_handle = in_var_handles[0];
auto pre_place = in_0_handle->place_;
std::vector<platform::Place> in_places;
for (auto *in_handle : in_var_handles) {
auto in_p = in_handle->place_;
PADDLE_ENFORCE_EQ(in_p.which(), pre_place.which(),
"Places must be all on CPU or all on CUDA.");
in_places.emplace_back(in_p);
}
auto out_var = local_scopes_[out_var_handles[0]->scope_idx_]->FindVar(
out_var_handles[0]->name_);
auto pre_in_var =
local_scopes_[in_0_handle->scope_idx_]->FindVar(in_0_handle->name_);
if (pre_in_var->IsType<framework::SelectedRows>()) {
auto &pre_in = pre_in_var->Get<framework::SelectedRows>();
std::vector<const SelectedRows *> in_selected_rows;
for (auto *in_handle : in_var_handles) {
auto in_var =
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::SelectedRows>();
PADDLE_ENFORCE_EQ(in_sr.value().type(), pre_in.value().type(),
"The type of input is not consistent.");
in_selected_rows.emplace_back(&in_sr);
}
auto trg = out_var->GetMutable<framework::SelectedRows>();
GatherSelectedRows(in_selected_rows, in_places, dev_ctxes_,
out_var_handles[0]->place_, trg);
} else {
auto pre_in = pre_in_var->Get<framework::LoDTensor>();
std::vector<LoDTensor> lod_tensors;
// can be refined
for (auto *in_handle : in_var_handles) {
auto in_var =
local_scopes_.at(in_handle->scope_idx_)->FindVar(in_handle->name_);
auto &in_sr = in_var->Get<framework::LoDTensor>();
PADDLE_ENFORCE_EQ(in_sr.type(), pre_in.type(),
"The type of input is not consistent.");
lod_tensors.emplace_back(in_sr);
}
auto trg = out_var->GetMutable<framework::LoDTensor>();
trg->Resize(pre_in.dims());
trg->mutable_data(out_var_handles[0]->place_, pre_in.type());
if (paddle::platform::is_cpu_place(pre_place)) {
ReduceLoDTensor func(lod_tensors, trg);
VisitDataType(ToDataType(lod_tensors[0].type()), func);
} else if (paddle::platform::is_gpu_place(pre_place)) {
#ifdef PADDLE_WITH_CUDA
auto out_p = out_var_handles[0]->place_;
int root = boost::get<platform::CUDAPlace>(out_p).device;
std::vector<std::function<void()>> all_reduce_calls;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &p = in_places[i];
auto &lod_tensor = lod_tensors[i];
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
void *buffer = const_cast<void *>(lod_tensor.data<void>());
void *recvbuffer = nullptr;
if (root == dev_id) {
recvbuffer = trg->mutable_data(out_var_handles[0]->place_);
}
all_reduce_calls.emplace_back([=] {
PADDLE_ENFORCE(platform::dynload::ncclReduce(
buffer, recvbuffer, static_cast<size_t>(lod_tensor.numel()),
platform::ToNCCLDataType(lod_tensor.type()), ncclSum, root, comm,
stream));
});
}
this->RunAndRecordEvent([&] {
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
});
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
PADDLE_THROW("Place should be CPUPlace or CUDAPlace.");
}
}
}
void ReduceOpHandle::WaitEvents(
const std::vector<VarHandle *> &in_var_handles) {
if (in_var_handles[0]->generated_op_) {
for (auto *in : in_var_handles) {
in_var_handles[0]->generated_op_->Wait(dev_ctxes_[in->place_]);
}
}
}
std::vector<VarHandle *> ReduceOpHandle::GetValidVarHandles(
const std::vector<VarHandleBase *> &inputs) {
std::vector<VarHandle *> in_var_handles;
for (auto *in : inputs) {
auto *in_handle = dynamic_cast<VarHandle *>(in);
if (in_handle) {
in_var_handles.push_back(in_handle);
}
}
return in_var_handles;
}
std::string ReduceOpHandle::Name() const { return "reduce"; }
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <map>
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/platform/device_context.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/nccl_helper.h"
#endif
namespace paddle {
namespace framework {
namespace details {
struct ReduceOpHandle : public OpHandleBase {
const std::vector<Scope *> &local_scopes_;
const std::vector<platform::Place> &places_;
#ifdef PADDLE_WITH_CUDA
const platform::NCCLContextMap *nccl_ctxs_;
ReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const platform::NCCLContextMap *nccl_ctxs)
: local_scopes_(local_scopes), places_(places), nccl_ctxs_(nccl_ctxs) {
if (nccl_ctxs_) {
for (auto &p_ctx : nccl_ctxs_->contexts_) {
dev_ctxes_[platform::CUDAPlace(p_ctx.first)] = p_ctx.second.ctx_.get();
}
}
}
#else
ReduceOpHandle(const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places)
: local_scopes_(local_scopes), places_(places) {}
#endif
std::string Name() const override;
bool IsMultiDeviceTransfer() override { return false; };
protected:
void RunImpl() override;
std::vector<VarHandle *> GetValidVarHandles(
const std::vector<VarHandleBase *> &inputs);
void WaitEvents(const std::vector<VarHandle *> &in_var_handles);
};
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/reduce_op_handle.h"
#include "gtest/gtest.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
namespace details {
namespace f = paddle::framework;
namespace p = paddle::platform;
// test data amount
const f::DDim kDims = {20, 20};
struct TestReduceOpHandle {
bool use_gpu_;
Scope g_scope_;
std::vector<Scope *> local_scopes_;
std::unique_ptr<OpHandleBase> op_handle_;
std::vector<std::unique_ptr<VarHandleBase>> vars_;
std::vector<p::Place> gpu_list_;
std::vector<std::unique_ptr<p::DeviceContext>> ctxs_;
#ifdef PADDLE_WITH_CUDA
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
#endif
void WaitAll() {
for (size_t j = 0; j < ctxs_.size(); ++j) {
ctxs_[j]->Wait();
}
#ifdef PADDLE_WITH_CUDA
if (nccl_ctxs_) {
nccl_ctxs_->WaitAll();
}
#endif
}
void InitCtxOnGpu(bool use_gpu) {
use_gpu_ = use_gpu;
if (use_gpu) {
#ifdef PADDLE_WITH_CUDA
int count = p::GetCUDADeviceCount();
if (count <= 1) {
LOG(WARNING) << "Cannot test multi-gpu Broadcast, because the CUDA "
"device count is "
<< count;
exit(0);
}
for (int i = 0; i < count; ++i) {
auto p = p::CUDAPlace(i);
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CUDADeviceContext(p));
}
nccl_ctxs_.reset(new platform::NCCLContextMap(gpu_list_));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
int count = 8;
for (int i = 0; i < count; ++i) {
auto p = p::CPUPlace();
gpu_list_.push_back(p);
ctxs_.emplace_back(new p::CPUDeviceContext(p));
}
#ifdef PADDLE_WITH_CUDA
nccl_ctxs_.reset(nullptr);
#endif
}
}
void InitReduceOp(size_t input_scope_idx) {
for (size_t j = 0; j < gpu_list_.size(); ++j) {
local_scopes_.push_back(&(g_scope_.NewScope()));
local_scopes_[j]->Var("out");
}
local_scopes_[input_scope_idx]->Var("input");
if (use_gpu_) {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
PADDLE_THROW("CUDA is not support.");
#endif
} else {
#ifdef PADDLE_WITH_CUDA
op_handle_.reset(
new ReduceOpHandle(local_scopes_, gpu_list_, nccl_ctxs_.get()));
#else
op_handle_.reset(new ReduceOpHandle(local_scopes_, gpu_list_));
#endif
}
// add input
for (size_t j = 0; j < gpu_list_.size(); ++j) {
if (!use_gpu_) {
op_handle_->SetDeviceContext(gpu_list_[j], ctxs_[j].get());
}
auto *in_var_handle = new VarHandle(1, j, "input", gpu_list_[j]);
in_var_handle->generated_op_ = nullptr;
vars_.emplace_back(in_var_handle);
op_handle_->AddInput(in_var_handle);
}
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle *in_dummy_var_handle =
static_cast<DummyVarHandle *>(vars_.back().get());
in_dummy_var_handle->generated_op_ = nullptr;
op_handle_->AddInput(in_dummy_var_handle);
// add output
auto *out_var_handle =
new VarHandle(2, input_scope_idx, "out", gpu_list_[input_scope_idx]);
vars_.emplace_back(out_var_handle);
op_handle_->AddOutput(out_var_handle);
// add dummy var
vars_.emplace_back(new DummyVarHandle());
DummyVarHandle *dummy_var_handle =
static_cast<DummyVarHandle *>(vars_.back().get());
op_handle_->AddOutput(dummy_var_handle);
}
void TestReduceSelectedRows(size_t output_scope_idx) {
int height = kDims[0] * 2;
std::vector<int64_t> rows{0, 1, 2, 3, 3, 0, 14, 7, 3, 1,
2, 4, 6, 3, 1, 1, 1, 1, 3, 7};
std::vector<float> send_vector(f::product(kDims));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
for (size_t input_scope_idx = 0; input_scope_idx < gpu_list_.size();
++input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
auto value = in_selected_rows->mutable_value();
value->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
in_selected_rows->set_height(height);
in_selected_rows->set_rows(rows);
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), value);
value->Resize(kDims);
}
auto out_var = local_scopes_[output_scope_idx]->Var("out");
auto out_selected_rows = out_var->GetMutable<f::SelectedRows>();
auto in_var = local_scopes_[output_scope_idx]->Var("input");
auto in_selected_rows = in_var->GetMutable<f::SelectedRows>();
out_selected_rows->mutable_value()->ShareDataWith(
in_selected_rows->value());
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
auto &out_select_rows = out_var->Get<f::SelectedRows>();
auto rt = out_select_rows.value();
PADDLE_ENFORCE_EQ(out_select_rows.height(), height, "height is not equal.");
for (size_t k = 0; k < out_select_rows.rows().size(); ++k) {
PADDLE_ENFORCE_EQ(out_select_rows.rows()[k], rows[k % rows.size()]);
}
f::Tensor result_tensor;
f::TensorCopy(rt, cpu_place, *(ctxs_[output_scope_idx]), &result_tensor);
float *ct = result_tensor.data<float>();
for (int64_t j = 0; j < f::product(result_tensor.dims()); ++j) {
ASSERT_NEAR(ct[j], send_vector[j % send_vector.size()], 1e-5);
}
}
void TestReduceLodTensors(size_t output_scope_idx) {
std::vector<float> send_vector(static_cast<size_t>(f::product(kDims)));
for (size_t k = 0; k < send_vector.size(); ++k) {
send_vector[k] = k;
}
f::LoD lod{{0, 10, 20}};
for (size_t input_scope_idx = 0; input_scope_idx < gpu_list_.size();
++input_scope_idx) {
auto in_var = local_scopes_[input_scope_idx]->Var("input");
auto in_lod_tensor = in_var->GetMutable<f::LoDTensor>();
in_lod_tensor->mutable_data<float>(kDims, gpu_list_[input_scope_idx]);
in_lod_tensor->set_lod(lod);
paddle::framework::TensorFromVector<float>(
send_vector, *(ctxs_[input_scope_idx]), in_lod_tensor);
}
auto out_var = local_scopes_[output_scope_idx]->Var("out");
auto out_lodtensor = out_var->GetMutable<f::LoDTensor>();
auto in_var = local_scopes_[output_scope_idx]->Var("input");
auto in_lodtensor = in_var->Get<f::LoDTensor>();
out_lodtensor->ShareDataWith(in_lodtensor);
op_handle_->Run(false);
WaitAll();
p::CPUPlace cpu_place;
auto &rt = out_var->Get<f::LoDTensor>();
f::Tensor result_tensor;
f::TensorCopy(rt, cpu_place, *(ctxs_[output_scope_idx]), &result_tensor);
float *ct = result_tensor.data<float>();
for (int64_t j = 0; j < f::product(result_tensor.dims()); ++j) {
ASSERT_NEAR(ct[j], send_vector[j] * gpu_list_.size(), 1e-5);
}
}
};
TEST(ReduceTester, TestCPUReduceTestSelectedRows) {
TestReduceOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitReduceOp(input_scope_idx);
test_op.TestReduceSelectedRows(input_scope_idx);
}
TEST(ReduceTester, TestCPUReduceTestLodTensor) {
TestReduceOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(false);
test_op.InitReduceOp(input_scope_idx);
test_op.TestReduceLodTensors(input_scope_idx);
}
#ifdef PADDLE_WITH_CUDA
TEST(ReduceTester, TestGPUReduceTestSelectedRows) {
TestReduceOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitReduceOp(input_scope_idx);
test_op.TestReduceSelectedRows(input_scope_idx);
}
TEST(ReduceTester, TestGPUReduceTestLodTensor) {
TestReduceOpHandle test_op;
size_t input_scope_idx = 0;
test_op.InitCtxOnGpu(true);
test_op.InitReduceOp(input_scope_idx);
test_op.TestReduceLodTensors(input_scope_idx);
}
#endif
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -61,6 +61,11 @@ struct VarHandle : public VarHandleBase { ...@@ -61,6 +61,11 @@ struct VarHandle : public VarHandleBase {
size_t scope_idx_; size_t scope_idx_;
std::string name_; std::string name_;
platform::Place place_; platform::Place place_;
bool operator==(const VarHandle& o) const {
return o.generated_op_ == generated_op_ && o.name_ == name_ &&
o.scope_idx_ == scope_idx_;
}
}; };
// Dummy Variable. It is used to represent dependencies between operators // Dummy Variable. It is used to represent dependencies between operators
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/framework/selected_rows.h"
namespace paddle {
namespace framework {
namespace details {
template <typename Func>
static void VisitVariable(Variable* var, Func* func) {
if (var->IsType<LoDTensor>()) {
(*func)(var->GetMutable<LoDTensor>());
} else if (var->IsType<SelectedRows>()) {
(*func)(var->GetMutable<SelectedRows>());
} else {
PADDLE_THROW("Not supported type %s", var->Type().name());
}
}
template <typename Func>
static void VisitVariable(const Variable& var, Func* func) {
if (var.IsType<LoDTensor>()) {
(*func)(var.Get<LoDTensor>());
} else if (var.IsType<SelectedRows>()) {
(*func)(var.Get<SelectedRows>());
} else {
PADDLE_THROW("Not supported type %s", var.Type().name());
}
}
struct TensorVisitor {
Tensor* result_{nullptr};
void operator()(LoDTensor* tensor) { result_ = tensor; }
void operator()(SelectedRows* selected_rows) {
result_ = selected_rows->mutable_value();
}
template <typename T>
void operator()() {
PADDLE_THROW("Not Support to get LoDTensor from %s", typeid(T).name());
}
};
Tensor& VariableVisitor::GetMutableTensor(Variable* var) {
TensorVisitor vistor;
VisitVariable(var, &vistor);
return *vistor.result_;
}
struct ShareDimsAndLoDVisitor {
Variable* trg_;
void operator()(const LoDTensor& val) {
auto* tensor = trg_->GetMutable<LoDTensor>();
tensor->set_layout(val.layout());
tensor->set_lod(val.lod());
tensor->Resize(val.dims());
}
void operator()(const SelectedRows& val) {
auto* selected_rows = trg_->GetMutable<SelectedRows>();
selected_rows->set_rows(val.rows());
selected_rows->set_height(val.height());
selected_rows->mutable_value()->Resize(val.value().dims());
}
template <typename T>
void operator()(const T&) {
PADDLE_ENFORCE("ShareDimsAndLoD is not supported by type %s",
typeid(T).name());
}
};
void VariableVisitor::ShareDimsAndLoD(const Variable& src, Variable* trg) {
ShareDimsAndLoDVisitor visitor{trg};
VisitVariable(src, &visitor);
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/variable.h"
namespace paddle {
namespace framework {
namespace details {
class VariableVisitor {
public:
static Tensor &GetMutableTensor(Variable *var);
static void ShareDimsAndLoD(const Variable &src, Variable *trg);
};
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -26,6 +26,11 @@ DEFINE_bool(benchmark, false, ...@@ -26,6 +26,11 @@ DEFINE_bool(benchmark, false,
"Default cuda is asynchronous device, set to True will" "Default cuda is asynchronous device, set to True will"
"force op run in synchronous mode."); "force op run in synchronous mode.");
DEFINE_bool(
eager_delete_scope, true,
"Delete local scope eagerly. It will reduce GPU memory usage but "
"slow down the destruction of variables.(around 1% performance harm)");
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -97,7 +102,7 @@ void Scope::DeleteScope(Scope* scope) const { ...@@ -97,7 +102,7 @@ void Scope::DeleteScope(Scope* scope) const {
PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope); PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope);
this->kids_.erase(it); this->kids_.erase(it);
// When making memory benchmark on Fluid, we have to delete scope sync. // When making memory benchmark on Fluid, we have to delete scope sync.
if (FLAGS_benchmark) { if (FLAGS_benchmark || FLAGS_eager_delete_scope) {
delete scope; delete scope;
} else { } else {
Async([scope] { delete scope; }); Async([scope] { delete scope; });
......
...@@ -59,15 +59,13 @@ class AsyncGRPCServer final { ...@@ -59,15 +59,13 @@ class AsyncGRPCServer final {
void SetProgram(framework::ProgramDesc *program) { program_ = program; } void SetProgram(framework::ProgramDesc *program) { program_ = program; }
void SetPrefetchBlkdId(int blkid) { prefetch_blk_id_ = blkid; }
void SetExecutor(framework::Executor *executor) { executor_ = executor; } void SetExecutor(framework::Executor *executor) { executor_ = executor; }
void SetPrefetchPreparedCtx(framework::ExecutorPrepareContext *prepared) { void SetPrefetchPreparedCtx(framework::ExecutorPrepareContext *prepared) {
prefetch_ctx_ = prepared; prefetch_ctx_ = prepared;
} }
int GetSelectedPort() { return selected_port_; } int GetSelectedPort() const { return selected_port_; }
const ReceivedMessage Get() { return this->var_recv_queue_.Pop(); } const ReceivedMessage Get() { return this->var_recv_queue_.Pop(); }
...@@ -114,7 +112,6 @@ class AsyncGRPCServer final { ...@@ -114,7 +112,6 @@ class AsyncGRPCServer final {
std::unique_ptr<std::thread> t_get_; std::unique_ptr<std::thread> t_get_;
std::unique_ptr<std::thread> t_prefetch_; std::unique_ptr<std::thread> t_prefetch_;
int prefetch_blk_id_;
framework::ExecutorPrepareContext *prefetch_ctx_; framework::ExecutorPrepareContext *prefetch_ctx_;
framework::ProgramDesc *program_; framework::ProgramDesc *program_;
framework::Executor *executor_; framework::Executor *executor_;
......
...@@ -27,20 +27,6 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) { ...@@ -27,20 +27,6 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service) {
VLOG(4) << "RunServer thread end"; VLOG(4) << "RunServer thread end";
} }
static void CreateTensorFromMessageType(framework::Variable *var,
sendrecv::VarType var_type) {
if (var_type == sendrecv::VarType::LOD_TENSOR) {
var->GetMutable<framework::LoDTensor>();
} else if (var_type == sendrecv::VarType::SELECTED_ROWS) {
var->GetMutable<framework::SelectedRows>();
} else {
PADDLE_THROW(
"VariableMessage type %d is not in "
"[LoDTensor, SelectedRows]",
var_type);
}
}
static void ParallelExecuteBlocks( static void ParallelExecuteBlocks(
const std::vector<size_t> &parallel_blkids, framework::Executor *executor, const std::vector<size_t> &parallel_blkids, framework::Executor *executor,
const std::vector<std::shared_ptr<framework::ExecutorPrepareContext>> const std::vector<std::shared_ptr<framework::ExecutorPrepareContext>>
...@@ -62,6 +48,13 @@ static void ParallelExecuteBlocks( ...@@ -62,6 +48,13 @@ static void ParallelExecuteBlocks(
for (size_t i = 0; i < fs.size(); ++i) fs[i].wait(); for (size_t i = 0; i < fs.size(); ++i) fs[i].wait();
} }
static void SavePort(std::shared_ptr<detail::AsyncGRPCServer> rpc_service) {
std::ofstream port_file;
port_file.open("/tmp/paddle.selected_port");
port_file << rpc_service->GetSelectedPort();
port_file.close();
}
ListenAndServOp::ListenAndServOp(const std::string &type, ListenAndServOp::ListenAndServOp(const std::string &type,
const framework::VariableNameMap &inputs, const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs, const framework::VariableNameMap &outputs,
...@@ -77,59 +70,25 @@ void ListenAndServOp::Stop() { ...@@ -77,59 +70,25 @@ void ListenAndServOp::Stop() {
server_thread_->join(); server_thread_->join();
} }
void ListenAndServOp::RunImpl(const framework::Scope &scope, void ListenAndServOp::RunSyncUpdate(
const platform::Place &dev_place) const { framework::Executor *executor, framework::ProgramDesc *program,
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); framework::Scope *recv_scope, framework::BlockDesc *prefetch_block) const {
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
if (!rpc_service_) {
std::string endpoint = Attr<std::string>("endpoint");
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
}
auto ins = Inputs("X");
auto fan_in = Attr<int>("Fanin"); auto fan_in = Attr<int>("Fanin");
auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *prefetch_block = Attr<framework::BlockDesc *>(kPrefetchBlock);
auto *program = optimize_block->Program();
size_t num_blocks = program->Size(); size_t num_blocks = program->Size();
PADDLE_ENFORCE_GE(num_blocks, 2, PADDLE_ENFORCE_GE(num_blocks, 2,
"server program should have at least 2 blocks"); "server program should have at least 2 blocks");
framework::Executor executor(dev_place);
std::vector<int> block_list; std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) { for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
if (blkid != static_cast<size_t>(prefetch_block->ID())) {
block_list.push_back(blkid); block_list.push_back(blkid);
} }
} auto optimize_prepared = executor->Prepare(*program, block_list);
auto optimize_prepared = executor.Prepare(*program, block_list);
// Insert placeholder for block0 which holds current op itself. // Insert placeholder for block0 which holds current op itself.
optimize_prepared.insert( optimize_prepared.insert(
optimize_prepared.begin(), optimize_prepared.begin(),
std::shared_ptr<framework::ExecutorPrepareContext>(nullptr)); std::shared_ptr<framework::ExecutorPrepareContext>(nullptr));
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
// TODO(qiao) set proper fields for table lookup and update
rpc_service_->SetExecutor(&executor);
VLOG(3) << "prefetch block id is " << prefetch_block->ID();
auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID());
rpc_service_->SetPrefetchBlkdId(prefetch_block->ID());
rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get());
prefetch_prepared.release();
rpc_service_->SetProgram(program);
// start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_));
VLOG(3) << "wait server thread to become ready...";
sleep(5);
// Write to a file of server selected port for python use.
std::ofstream port_file;
port_file.open("/tmp/paddle.selected_port");
port_file << rpc_service_->GetSelectedPort();
port_file.close();
bool exit_flag = false; bool exit_flag = false;
// Record received sparse variables, so that // Record received sparse variables, so that
// we could reset those after execute optimize program // we could reset those after execute optimize program
...@@ -170,7 +129,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -170,7 +129,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
break; break;
} }
// NOTE: if is_gpu_place, CUDA kernels are laugched by multiple threads // NOTE: if is_gpu_place, CUDA kernels are launched by multiple threads
// and this will still work. // and this will still work.
// The optimize blocks which have the same parent ID would run parallel // The optimize blocks which have the same parent ID would run parallel
...@@ -182,16 +141,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -182,16 +141,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
for (size_t blkid = 2; blkid < num_blocks; ++blkid) { for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (blkid != static_cast<size_t>(prefetch_block->ID())) { if (blkid != static_cast<size_t>(prefetch_block->ID())) {
if (program->Block(blkid).Parent() != last_parent_blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared,
program, &recv_scope); program, recv_scope);
parallel_blkids.clear(); parallel_blkids.clear();
last_parent_blkid = program->Block(blkid).Parent(); last_parent_blkid = program->Block(blkid).Parent();
} }
parallel_blkids.push_back(blkid); parallel_blkids.push_back(blkid);
} }
} }
ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, ParallelExecuteBlocks(parallel_blkids, executor, optimize_prepared, program,
program, &recv_scope); recv_scope);
VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)"; VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)";
// Reset the received sparse variables, the sum operator would not // Reset the received sparse variables, the sum operator would not
...@@ -209,6 +168,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -209,6 +168,42 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
} // while(true) } // while(true)
} }
void ListenAndServOp::RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope();
PADDLE_ENFORCE(!rpc_service_);
std::string endpoint = Attr<std::string>("endpoint");
rpc_service_.reset(new detail::AsyncGRPCServer(endpoint));
auto *optimize_block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *prefetch_block = Attr<framework::BlockDesc *>(kPrefetchBlock);
auto *program = optimize_block->Program();
framework::Executor executor(dev_place);
// prepare rpc_service
rpc_service_->SetScope(&recv_scope);
rpc_service_->SetDevCtx(&dev_ctx);
rpc_service_->SetProgram(program);
rpc_service_->SetExecutor(&executor);
// prepare for prefetch
VLOG(3) << "prefetch block id is " << prefetch_block->ID();
auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID());
rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get());
prefetch_prepared.release();
// start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_));
VLOG(3) << "wait server thread to become ready...";
sleep(5);
// Write to a file of server selected port for python use.
SavePort(rpc_service_);
RunSyncUpdate(&executor, program, &recv_scope, prefetch_block);
}
class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker) ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker)
......
...@@ -34,17 +34,22 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service); ...@@ -34,17 +34,22 @@ void RunServer(std::shared_ptr<detail::AsyncGRPCServer> service);
class ListenAndServOp : public framework::OperatorBase { class ListenAndServOp : public framework::OperatorBase {
public: public:
ListenAndServOp(const std::string &type, ListenAndServOp(const std::string& type,
const framework::VariableNameMap &inputs, const framework::VariableNameMap& inputs,
const framework::VariableNameMap &outputs, const framework::VariableNameMap& outputs,
const framework::AttributeMap &attrs); const framework::AttributeMap& attrs);
int GetSelectedPort() const; int GetSelectedPort() const;
void RunSyncUpdate(framework::Executor* executor,
framework::ProgramDesc* program,
framework::Scope* recv_scope,
framework::BlockDesc* prefetch_block) const;
void Stop() override; void Stop() override;
void RunImpl(const framework::Scope &scope, void RunImpl(const framework::Scope& scope,
const platform::Place &dev_place) const override; const platform::Place& dev_place) const override;
protected: protected:
mutable std::shared_ptr<detail::AsyncGRPCServer> rpc_service_; mutable std::shared_ptr<detail::AsyncGRPCServer> rpc_service_;
......
...@@ -127,7 +127,7 @@ void StartServerNet(bool is_sparse) { ...@@ -127,7 +127,7 @@ void StartServerNet(bool is_sparse) {
const auto &root_block = program.Block(0); const auto &root_block = program.Block(0);
auto *optimize_block = program.AppendBlock(root_block); auto *optimize_block = program.AppendBlock(root_block);
auto *prefetch_block = program.AppendBlock(root_block); auto *prefetch_block = program.AppendBlock(root_block);
// X for server side tensors, RX for received tensers, must be of same shape. // X for server side tensors, RX for received tensors, must be of same shape.
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block); AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, optimize_block);
f::AttributeMap attrs; f::AttributeMap attrs;
......
...@@ -107,7 +107,8 @@ def __bootstrap__(): ...@@ -107,7 +107,8 @@ def __bootstrap__():
os.environ['OMP_NUM_THREADS'] = str(num_threads) os.environ['OMP_NUM_THREADS'] = str(num_threads)
read_env_flags = [ read_env_flags = [
'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir' 'use_pinned_memory', 'check_nan_inf', 'benchmark', 'warpctc_dir',
'eager_delete_scope'
] ]
if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
read_env_flags += ['fraction_of_gpu_memory_to_use'] read_env_flags += ['fraction_of_gpu_memory_to_use']
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册