未验证 提交 1b564bc4 编写于 作者: Z Zeng Jinle 提交者: GitHub

Merge pull request #14670 from sneaxiy/refactor_eager_deletion

Rewrite eager deletion
...@@ -72,6 +72,8 @@ cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto ...@@ -72,6 +72,8 @@ cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto
cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory) cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory)
nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor)
cc_library(garbage_collector SRCS garbage_collector.cc DEPS device_context memory)
cc_library(reader SRCS reader.cc DEPS lod_tensor ddim) cc_library(reader SRCS reader.cc DEPS lod_tensor ddim)
cc_test(reader_test SRCS reader_test.cc DEPS reader) cc_test(reader_test SRCS reader_test.cc DEPS reader)
...@@ -183,6 +185,8 @@ else() ...@@ -183,6 +185,8 @@ else()
cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op) cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op)
endif() endif()
target_link_libraries(executor garbage_collector)
cc_library(parallel_executor SRCS parallel_executor.cc DEPS cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor threaded_ssa_graph_executor scope_buffered_ssa_graph_executor
graph build_strategy graph build_strategy
......
...@@ -45,10 +45,10 @@ cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base s ...@@ -45,10 +45,10 @@ cc_library(fuse_vars_op_handle SRCS fuse_vars_op_handle.cc DEPS op_handle_base s
cc_library(modify_op_lock_and_record_event_pass SRCS modify_op_lock_and_record_event_pass.cc DEPS computation_op_handle op_graph_view multi_devices_helper) cc_library(modify_op_lock_and_record_event_pass SRCS modify_op_lock_and_record_event_pass.cc DEPS computation_op_handle op_graph_view multi_devices_helper)
if (WITH_GPU) cc_library(reference_count_pass_helper SRCS reference_count_pass_helper.cc DEPS garbage_collector computation_op_handle)
cc_library(reference_count_pass SRCS reference_count_pass.cc DEPS computation_op_handle scale_loss_grad_op_handle rpc_op_handle cc_library(eager_deletion_op_handle SRCS eager_deletion_op_handle.cc DEPS lod_tensor selected_rows reference_count_pass_helper)
all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle graph graph_helper pass) cc_library(eager_deletion_pass SRCS eager_deletion_pass.cc DEPS computation_op_handle eager_deletion_op_handle graph graph_helper pass)
endif() cc_library(reference_count_pass SRCS reference_count_pass.cc DEPS computation_op_handle graph graph_helper pass op_graph_view reference_count_pass_helper)
cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass)
cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_helper pass) cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_helper pass)
...@@ -56,10 +56,7 @@ cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_he ...@@ -56,10 +56,7 @@ cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS graph graph_he
cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle
scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle fused_broadcast_op_handle) scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle fused_broadcast_op_handle)
set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass) set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto sequential_execution_pass modify_op_lock_and_record_event_pass all_reduce_deps_pass reference_count_pass eager_deletion_pass)
if (WITH_GPU)
list(APPEND SSA_GRAPH_EXECUTOR_DEPS reference_count_pass)
endif()
cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUTOR_DEPS}) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUTOR_DEPS})
......
...@@ -20,11 +20,13 @@ namespace paddle { ...@@ -20,11 +20,13 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
ComputationOpHandle::ComputationOpHandle(ir::Node *node, Scope *scope, ComputationOpHandle::ComputationOpHandle(ir::Node *node, Scope *scope,
platform::Place place) platform::Place place,
size_t scope_idx)
: OpHandleBase(node), : OpHandleBase(node),
op_(framework::OpRegistry::CreateOp(*node->Op())), op_(framework::OpRegistry::CreateOp(*node->Op())),
scope_(scope), scope_(scope),
place_(place) {} place_(place),
scope_idx_(scope_idx) {}
void ComputationOpHandle::RunImpl() { void ComputationOpHandle::RunImpl() {
WaitInputVarGenerated(place_); WaitInputVarGenerated(place_);
......
...@@ -28,7 +28,8 @@ namespace framework { ...@@ -28,7 +28,8 @@ namespace framework {
namespace details { namespace details {
struct ComputationOpHandle : public OpHandleBase { struct ComputationOpHandle : public OpHandleBase {
public: public:
ComputationOpHandle(ir::Node *node, Scope *scope, platform::Place place); ComputationOpHandle(ir::Node *node, Scope *scope, platform::Place place,
size_t scope_idx);
std::string Name() const override; std::string Name() const override;
...@@ -38,6 +39,8 @@ struct ComputationOpHandle : public OpHandleBase { ...@@ -38,6 +39,8 @@ struct ComputationOpHandle : public OpHandleBase {
void SetLockAndRecordEventFree(bool b) { is_lock_and_record_event_free_ = b; } void SetLockAndRecordEventFree(bool b) { is_lock_and_record_event_free_ = b; }
size_t GetScopeIdx() const { return scope_idx_; }
protected: protected:
void RunImpl() override; void RunImpl() override;
...@@ -47,6 +50,7 @@ struct ComputationOpHandle : public OpHandleBase { ...@@ -47,6 +50,7 @@ struct ComputationOpHandle : public OpHandleBase {
std::unique_ptr<OperatorBase> op_; std::unique_ptr<OperatorBase> op_;
Scope *scope_; Scope *scope_;
platform::Place place_; platform::Place place_;
size_t scope_idx_;
bool is_lock_and_record_event_free_{false}; bool is_lock_and_record_event_free_{false};
}; };
} // namespace details } // namespace details
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/eager_deletion_op_handle.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
namespace paddle {
namespace framework {
namespace details {
EagerDeletionOpHandle::EagerDeletionOpHandle(
ir::Node *node, const Scope *scope, const platform::Place &place,
const std::unordered_set<std::string> &var_names, GarbageCollector *gc,
AtomicReferenceCountMap *ref_cnts)
: OpHandleBase(node),
scope_(scope),
var_names_(var_names),
gc_(gc),
ref_cnts_(ref_cnts) {
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place)) {
dev_ctx_ = reinterpret_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
if (dynamic_cast<StreamGarbageCollector *>(gc_)) {
platform::CUDADeviceGuard guard(
boost::get<platform::CUDAPlace>(place).device);
PADDLE_ENFORCE(cudaEventCreateWithFlags(&event_, cudaEventDisableTiming));
PADDLE_ENFORCE_NOT_NULL(event_);
}
}
#endif
}
EagerDeletionOpHandle::~EagerDeletionOpHandle() {
#ifdef PADDLE_WITH_CUDA
if (event_) {
auto gpu_place = boost::get<platform::CUDAPlace>(dev_ctx_->GetPlace());
platform::CUDADeviceGuard guard(gpu_place.device);
PADDLE_ENFORCE(cudaEventDestroy(event_));
}
#endif
}
std::string EagerDeletionOpHandle::Name() const { return "eager_deletion"; }
void EagerDeletionOpHandle::RunImpl() {
auto *exec_scope = scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
std::deque<std::shared_ptr<memory::Allocation>> garbages;
for (auto &name : var_names_) {
auto it = ref_cnts_->find(name);
// Var not found, not reference count has not decreased to 0
if (it == ref_cnts_->end() || it->second.fetch_sub(1) != 1) {
continue;
}
auto *var = exec_scope->FindVar(name);
if (var == nullptr) {
continue;
}
VLOG(2) << "Erase variable " << name;
if (var->IsType<LoDTensor>()) {
garbages.emplace_back(var->GetMutable<LoDTensor>()->MoveMemoryHolder());
} else if (var->IsType<SelectedRows>()) {
garbages.emplace_back(
var->GetMutable<SelectedRows>()->mutable_value()->MoveMemoryHolder());
} else if (var->IsType<LoDTensorArray>()) {
auto *tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto &t : *tensor_arr) {
garbages.emplace_back(t.MoveMemoryHolder());
}
} else {
PADDLE_THROW("Type %s of %s is not supported eager deletion",
var->Type().name(), name);
}
}
if (!garbages.empty()) {
ClearGarbages(&garbages);
}
}
void EagerDeletionOpHandle::ClearGarbages(
std::deque<std::shared_ptr<memory::Allocation>> *garbages) {
#ifdef PADDLE_WITH_CUDA
if (event_) {
auto compute_stream = dev_ctx_->stream();
auto callback_stream =
reinterpret_cast<StreamGarbageCollector *>(gc_)->stream();
auto callback_func = [=]() {
PADDLE_ENFORCE(cudaEventRecord(event_, compute_stream));
PADDLE_ENFORCE(cudaStreamWaitEvent(callback_stream, event_, 0));
};
gc_->Add(std::move(*garbages), callback_func);
} else {
#endif
gc_->Add(std::move(*garbages));
#ifdef PADDLE_WITH_CUDA
}
#endif
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <deque>
#include <string>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
namespace paddle {
namespace framework {
class Scope;
namespace details {
class EagerDeletionOpHandle : public OpHandleBase {
public:
EagerDeletionOpHandle(ir::Node *node, const Scope *scope,
const platform::Place &place,
const std::unordered_set<std::string> &var_names,
GarbageCollector *gc,
AtomicReferenceCountMap *ref_cnts);
~EagerDeletionOpHandle();
std::string Name() const override;
protected:
void RunImpl() override;
private:
void ClearGarbages(std::deque<std::shared_ptr<memory::Allocation>> *garbages);
const Scope *scope_;
std::unordered_set<std::string> var_names_;
GarbageCollector *gc_; // not own
AtomicReferenceCountMap *ref_cnts_; // not own
#ifdef PADDLE_WITH_CUDA
platform::CUDADeviceContext *dev_ctx_{nullptr};
cudaEvent_t event_{nullptr};
#endif
};
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <queue>
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/eager_deletion_op_handle.h"
#include "paddle/fluid/framework/details/eager_deletion_pass.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
namespace paddle {
namespace framework {
namespace details {
std::unique_ptr<ir::Graph> EagerDeletionPass::ApplyImpl(
std::unique_ptr<ir::Graph> graph) const {
auto &ref_cnts =
Get<std::vector<AtomicReferenceCountMap>>(kRuntimeReferenceCount);
PADDLE_ENFORCE(ref_cnts.empty(),
"kRuntimeReferenceCount should be initialized here!");
const auto &vars = graph->Get<GraphVars>(kGraphVars);
ref_cnts.resize(vars.size());
const auto &last_live_ops =
Get<std::vector<LastLiveOpsOfVars>>(kLastLiveOpsOfVars);
const auto &gcs = Get<GarbageCollectorMap>(kGarbageCollector);
const auto &places = Get<std::vector<platform::Place>>(kAllPlaces);
// a reverse map of last_live_ops
// i.e., last op --> variable names which can be deleted.
std::unordered_map<ComputationOpHandle *, std::unordered_set<std::string>>
op_vars_map;
for (auto &var_ops_map : last_live_ops) {
for (auto &var_ops_pair : var_ops_map) {
const std::string &var_name = var_ops_pair.first;
for (auto *op : var_ops_pair.second) {
op_vars_map[op].insert(var_name);
}
}
}
for (auto &pair : op_vars_map) {
auto *op = pair.first;
auto &var_names = pair.second;
auto *eager_deletion_node =
graph->CreateEmptyNode("eager_deletion", ir::Node::Type::kOperation);
auto *eager_deletion_op = new EagerDeletionOpHandle(
eager_deletion_node, op->GetScope(), op->GetPlace(), var_names,
gcs.at(places[op->GetScopeIdx()]).get(),
&(ref_cnts[op->GetScopeIdx()]));
auto it = std::find_if(
op->Outputs().begin(), op->Outputs().end(), [](VarHandleBase *var) {
return dynamic_cast<DummyVarHandle *>(var) != nullptr;
});
if (it != op->Outputs().end()) {
eager_deletion_op->AddInput(*it);
} else {
auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar());
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dep_var);
op->AddOutput(dep_var);
eager_deletion_op->AddInput(dep_var);
}
auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar());
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dummy_leaf);
eager_deletion_op->AddOutput(dummy_leaf);
}
VLOG(10) << "Create " << op_vars_map.size() << " EagerDeletionOpHandle(s)";
return graph;
}
} // namespace details
} // namespace framework
} // namespace paddle
REGISTER_PASS(eager_deletion_pass,
paddle::framework::details::EagerDeletionPass)
.RequirePassAttr(paddle::framework::details::kRuntimeReferenceCount)
.RequirePassAttr(paddle::framework::details::kLastLiveOpsOfVars)
.RequirePassAttr(paddle::framework::details::kAllPlaces)
.RequirePassAttr(paddle::framework::details::kGarbageCollector);
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/pass.h"
namespace paddle {
namespace framework {
namespace details {
class EagerDeletionPass : public ir::Pass {
protected:
std::unique_ptr<ir::Graph> ApplyImpl(
std::unique_ptr<ir::Graph> graph) const override;
};
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -565,7 +565,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(ir::Graph *result, ...@@ -565,7 +565,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(ir::Graph *result,
int dev_id) const { int dev_id) const {
result->Get<GraphOps>(kGraphOps).emplace_back( result->Get<GraphOps>(kGraphOps).emplace_back(
new ComputationOpHandle(result->CreateOpNode(node->Op()), new ComputationOpHandle(result->CreateOpNode(node->Op()),
local_scopes_[dev_id], places_[dev_id])); local_scopes_[dev_id], places_[dev_id], dev_id));
CreateOpHandleIOs(result, node, dev_id); CreateOpHandleIOs(result, node, dev_id);
} }
...@@ -688,8 +688,8 @@ void MultiDevSSAGraphBuilder::CreateComputationalOps(ir::Graph *result, ...@@ -688,8 +688,8 @@ void MultiDevSSAGraphBuilder::CreateComputationalOps(ir::Graph *result,
for (size_t scope_idx = 0; scope_idx < num_places; ++scope_idx) { for (size_t scope_idx = 0; scope_idx < num_places; ++scope_idx) {
auto p = places_[scope_idx]; auto p = places_[scope_idx];
auto s = local_scopes_[scope_idx]; auto s = local_scopes_[scope_idx];
result->Get<GraphOps>(kGraphOps).emplace_back( result->Get<GraphOps>(kGraphOps).emplace_back(new ComputationOpHandle(
new ComputationOpHandle(result->CreateOpNode(node->Op()), s, p)); result->CreateOpNode(node->Op()), s, p, scope_idx));
CreateOpHandleIOs(result, node, scope_idx); CreateOpHandleIOs(result, node, scope_idx);
} }
} }
......
...@@ -23,6 +23,8 @@ namespace details { ...@@ -23,6 +23,8 @@ namespace details {
OpGraphView::OpGraphView(const std::vector<OpHandleBase *> &ops) { Build(ops); } OpGraphView::OpGraphView(const std::vector<OpHandleBase *> &ops) { Build(ops); }
void OpGraphView::Build(const std::vector<OpHandleBase *> &ops) { void OpGraphView::Build(const std::vector<OpHandleBase *> &ops) {
preceding_ops_.clear();
pending_ops_.clear();
for (auto &op : ops) { for (auto &op : ops) {
preceding_ops_[op]; preceding_ops_[op];
pending_ops_[op]; pending_ops_[op];
...@@ -40,6 +42,7 @@ void OpGraphView::Build(const std::vector<OpHandleBase *> &ops) { ...@@ -40,6 +42,7 @@ void OpGraphView::Build(const std::vector<OpHandleBase *> &ops) {
std::unordered_set<OpHandleBase *> OpGraphView::AllOps() const { std::unordered_set<OpHandleBase *> OpGraphView::AllOps() const {
std::unordered_set<OpHandleBase *> ret; std::unordered_set<OpHandleBase *> ret;
ret.reserve(preceding_ops_.size());
for (auto &pair : preceding_ops_) { for (auto &pair : preceding_ops_) {
ret.insert(pair.first); ret.insert(pair.first);
} }
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#pragma once #pragma once
#include <memory> #include <queue>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
...@@ -34,6 +34,11 @@ class OpGraphView { ...@@ -34,6 +34,11 @@ class OpGraphView {
bool HasOp(OpHandleBase *op) const; bool HasOp(OpHandleBase *op) const;
// Use a visitor to visit all pending ops of op
// Stop when callback returns false
template <typename Callback>
bool VisitAllPendingOps(OpHandleBase *op, Callback &&callback) const;
private: private:
void Build(const std::vector<OpHandleBase *> &ops); void Build(const std::vector<OpHandleBase *> &ops);
void EnforceHasOp(OpHandleBase *op) const; void EnforceHasOp(OpHandleBase *op) const;
...@@ -44,6 +49,28 @@ class OpGraphView { ...@@ -44,6 +49,28 @@ class OpGraphView {
pending_ops_; pending_ops_;
}; };
template <typename Callback>
bool OpGraphView::VisitAllPendingOps(OpHandleBase *op,
Callback &&callback) const {
EnforceHasOp(op);
std::unordered_set<OpHandleBase *> visited;
std::queue<OpHandleBase *> q;
q.push(op);
do {
op = q.front();
q.pop();
for (auto &pending_op : pending_ops_.at(op)) {
if (visited.count(pending_op) == 0) {
visited.insert(pending_op);
if (!callback(pending_op)) {
return false;
}
}
}
} while (!q.empty());
return true;
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor.h"
namespace paddle {
namespace framework {
namespace details {
using ReferenceCountMap = std::unordered_map<std::string, int>;
using AtomicReferenceCountMap =
std::unordered_map<std::string, std::atomic<int>>;
using DeviceReferenceCountMap =
std::unordered_map<int, std::unique_ptr<ReferenceCountMap>>;
using AtomicDeviceReferenceCountMap =
std::unordered_map<int, std::unique_ptr<AtomicReferenceCountMap>>;
using DeviceGarbageCollectorMap =
std::unordered_map<int,
std::unique_ptr<GarbageCollector<framework::Tensor>>>;
class ReferenceCountOpHandle : public OpHandleBase {
public:
ReferenceCountOpHandle(ir::Node *node, const Scope *scope,
const platform::CUDAPlace &place,
const std::vector<std::string> &var_names,
GarbageCollector<Tensor> *gc,
AtomicReferenceCountMap *ref_cnts)
: OpHandleBase(node), scope_(scope), gc_(gc), ref_cnts_(ref_cnts) {
dev_ctx_ = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
if (IsStreamGarabageCollector()) {
platform::SetDeviceId(place.device);
PADDLE_ENFORCE(cudaEventCreateWithFlags(&event_, cudaEventDisableTiming));
}
for (auto &name : var_names) AddVar(name);
}
~ReferenceCountOpHandle() {
if (IsStreamGarabageCollector()) {
auto gpu_place = boost::get<platform::CUDAPlace>(dev_ctx_->GetPlace());
platform::SetDeviceId(gpu_place.device);
PADDLE_ENFORCE(cudaEventDestroy(event_));
}
}
std::string Name() const override { return "reference_count"; }
void AddVar(const std::string &name) {
auto it = var_names_.find(name);
if (it != var_names_.end())
++(it->second);
else
var_names_[name] = 1;
}
protected:
void RunImpl() override {
auto *exec_scope = scope_->FindVar(kLocalExecScopeName)->Get<Scope *>();
std::vector<Tensor *> tensors;
for (auto &pair : var_names_) {
auto &name = pair.first;
auto it = ref_cnts_->find(name);
if (it == ref_cnts_->end()) continue;
auto *var = exec_scope->FindVar(name);
if (var == nullptr) continue;
if (var->IsType<LoDTensor>()) {
if (it->second.fetch_sub(pair.second) <= pair.second) {
tensors.emplace_back(var->GetMutable<LoDTensor>());
}
} else if (var->IsType<SelectedRows>()) {
if (it->second.fetch_sub(pair.second) <= pair.second) {
tensors.emplace_back(
var->GetMutable<SelectedRows>()->mutable_value());
}
}
}
if (!tensors.empty()) {
ClearTensors(tensors);
}
}
private:
void ClearTensors(const std::vector<Tensor *> &tensors) {
auto *gc = dynamic_cast<StreamGarbageCollector<Tensor> *>(gc_);
if (gc != nullptr) {
auto compute_stream = dev_ctx_->stream();
auto callback_stream = gc->stream();
auto callback_func = [=]() {
PADDLE_ENFORCE(cudaEventRecord(event_, compute_stream));
PADDLE_ENFORCE(cudaStreamWaitEvent(callback_stream, event_, 0));
};
gc_->Add(tensors, callback_func);
} else {
gc_->Add(tensors);
}
}
bool IsStreamGarabageCollector() const {
return dynamic_cast<const StreamGarbageCollector<Tensor> *>(gc_) != nullptr;
}
const Scope *scope_;
platform::CUDADeviceContext *dev_ctx_;
std::unordered_map<std::string, int> var_names_;
GarbageCollector<Tensor> *gc_; // not own
AtomicReferenceCountMap *ref_cnts_; // not own
cudaEvent_t event_;
};
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -14,187 +14,240 @@ ...@@ -14,187 +14,240 @@
#include <queue> #include <queue>
#include <string> #include <string>
#include <type_traits>
#include <vector> #include <vector>
#include "paddle/fluid/framework/details/computation_op_handle.h" #include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/eager_deletion_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/op_graph_view.h"
#include "paddle/fluid/framework/details/reference_count_pass.h" #include "paddle/fluid/framework/details/reference_count_pass.h"
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/ir/graph_helper.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
static ComputationOpHandle *FindNextComputationOpHandle(VarHandle *var_in) { // A functor to shrink/remove operators who depend on other operators in a set
std::queue<VarHandleBase *> queue; class ShrinkDepsOpFunctor {
queue.push(var_in); private:
do { enum RelationShip { kSame = 0, kNoDeps = 1, kBefore = 2, kAfter = 3 };
auto *var = queue.front();
queue.pop(); public:
for (auto *op : var->PendingOps()) { explicit ShrinkDepsOpFunctor(const std::vector<OpHandleBase *> &all_ops)
auto *compute_op = dynamic_cast<ComputationOpHandle *>(op); : graph_(all_ops) {}
if (compute_op != nullptr && compute_op->GetPlace() == var_in->place_) {
return compute_op; template <typename OpSet>
OpSet operator()(const OpSet &op_set) const {
using KeyType = typename OpSet::key_type;
static_assert(
std::is_base_of<OpHandleBase,
typename std::remove_pointer<KeyType>::type>::value,
"Key type of OpSet must be OpHandleBase, or derived of OpHandleBase");
if (op_set.size() <= 1) return op_set;
std::vector<OpHandleBase *> ops(op_set.begin(), op_set.end());
OpSet ret;
auto rels = GetRelations(ops);
auto not_before = [](RelationShip r) { return r != kBefore; };
for (size_t i = 0; i < rels.size(); ++i) {
if (std::all_of(rels[i].begin(), rels[i].end(), not_before)) {
ret.emplace(static_cast<KeyType>(ops[i]));
} }
for (auto *out_var : op->Outputs()) { }
queue.push(out_var); return ret;
}
private:
std::vector<std::vector<RelationShip>> GetRelations(
const std::vector<OpHandleBase *> &ops) const {
std::unordered_map<OpHandleBase *, size_t> op_to_idx;
for (size_t i = 0; i < ops.size(); ++i) {
PADDLE_ENFORCE(graph_.HasOp(ops[i]), "Op does not exist in graph");
op_to_idx[ops[i]] = i;
}
PADDLE_ENFORCE(op_to_idx.size() == ops.size(), "Duplicate ops");
std::vector<std::vector<RelationShip>> ret(ops.size());
for (auto &e : ret) {
e.assign(ops.size(), kSame);
}
size_t found_num = ops.size();
size_t total_num = ops.size() * ops.size();
auto visitor = [&](OpHandleBase *op, size_t i) {
auto it = op_to_idx.find(op);
if (it != op_to_idx.end()) {
size_t j = it->second;
if (i != j && ret[i][j] == kSame) {
ret[i][j] = kBefore;
ret[j][i] = kAfter;
found_num += 2;
if (found_num == total_num) {
return false;
}
}
}
return true;
};
for (size_t i = 0; i < ops.size(); ++i) {
auto sub_visitor = [&, i](OpHandleBase *op) { return visitor(op, i); };
if (!graph_.VisitAllPendingOps(ops[i], sub_visitor)) {
break;
}
}
for (size_t i = 0; i < ops.size(); ++i) {
for (size_t j = i + 1; j < ops.size(); ++j) {
if (ret[i][j] != kSame) continue;
ret[i][j] = kNoDeps;
ret[j][i] = kNoDeps;
}
}
return ret;
}
const OpGraphView graph_;
};
/**
* Find the nearest downstream computation op handle. If the op is a
* computation op, just return itself.
*/
static ComputationOpHandle *FindNextComputationOpHandleOrReturnItself(
OpHandleBase *op, size_t scope_idx) {
std::queue<OpHandleBase *> q;
std::unordered_set<OpHandleBase *> visited;
q.push(op);
do {
auto *op = q.front();
q.pop();
auto *compute_op = dynamic_cast<ComputationOpHandle *>(op);
if (compute_op != nullptr && compute_op->GetScopeIdx() == scope_idx) {
return compute_op;
}
for (auto *out_var : op->Outputs()) {
for (auto *pending_op : out_var->PendingOps()) {
if (visited.count(pending_op)) continue;
visited.insert(pending_op);
} }
} }
} while (!queue.empty()); } while (!q.empty());
return nullptr; return nullptr;
} }
static void AddDependencyBetween(OpHandleBase *in, OpHandleBase *out, static std::unordered_set<ComputationOpHandle *>
ir::Graph *graph) { ExtractComputationOpFromLastLivedVar(VarHandle *var, size_t scope_idx,
auto it = std::find_if( const ShrinkDepsOpFunctor &shrink_func,
in->Outputs().begin(), in->Outputs().end(), [](VarHandleBase *var) { bool *ok) {
return dynamic_cast<DummyVarHandle *>(var) != nullptr; // stage one. Get last op for variable.
}); std::unordered_set<OpHandleBase *> candidates;
{
if (it != in->Outputs().end()) { if (var->PendingOps().empty() && var->GeneratedOp()) {
out->AddInput(*it); // No operator depends on this variable. So the last operator is the op
} else { // who generates this variable.
auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); candidates.emplace(var->GeneratedOp());
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dep_var); } else {
in->AddOutput(dep_var); candidates = var->PendingOps();
out->AddInput(dep_var); }
// No pending ops or generated op is nullptr
if (candidates.empty()) {
*ok = false;
return {};
}
}
// stage two. Try to cast them to computation op.
// return (*ok=false) when failed.
//
// The reason why we cannot make any types of op handle to be the last lived
// op is:
// some op handle may operate on many DeviceContext, however, our garbage
// collector can only wait one DeviceContext for now. So currently, we wait
// the nearest compute op.
std::unordered_set<ComputationOpHandle *> computation_op;
{
for (auto *op : candidates) {
auto *compute_op =
FindNextComputationOpHandleOrReturnItself(op, scope_idx);
if (compute_op == nullptr) {
*ok = false;
return {};
}
computation_op.emplace(compute_op);
}
} }
// stage three. Try to shrink computation op if they depend on each other.
// Get the smallest set of the most ops.
*ok = true;
return shrink_func(computation_op);
}
static VarDesc *TryGetLatestVarDesc(const std::vector<VarHandle *> &vars) {
VarDesc *var_desc = nullptr;
std::find_if(vars.rbegin(), vars.rend(), [&](VarHandle *var_handle) -> bool {
var_desc = var_handle->Node()->Var();
return var_desc != nullptr;
});
return var_desc;
} }
std::unique_ptr<ir::Graph> ReferenceCountPass::ApplyImpl( std::unique_ptr<ir::Graph> ReferenceCountPass::ApplyImpl(
std::unique_ptr<ir::Graph> graph) const { std::unique_ptr<ir::Graph> graph) const {
auto &ref_cnts = Get<DeviceReferenceCountMap>(kGlobalReferenceCount); auto &ref_cnts = Get<std::vector<ReferenceCountMap>>(kGlobalReferenceCount);
auto &cur_ref_cnts = Get<AtomicDeviceReferenceCountMap>(kCurReferenceCount); auto &last_live_ops_of_vars =
auto &gcs = Get<DeviceGarbageCollectorMap>(kGarbageCollector); Get<std::vector<LastLiveOpsOfVars>>(kLastLiveOpsOfVars);
// It is not easy to find the right reference counts of varaibles in graph PADDLE_ENFORCE(last_live_ops_of_vars.empty() && ref_cnts.empty(),
// Step 1: Find all variables in computation ops "Last Live Ops and Reference Counts of vars should be "
// Step 2: Find all variables in non-computation ops which refers to variables "initialized at here.");
// in computation ops
std::unordered_set<std::string> names;
std::unordered_map<OpHandleBase *, ReferenceCountOpHandle *>
compute_ref_cnt_map;
auto get_ref_cnts_from_compute_op = [&](
OpHandleBase *op, const std::vector<VarHandleBase *> &vars) {
std::vector<std::string> var_names_in_op;
auto *compute_op = dynamic_cast<ComputationOpHandle *>(op);
if (compute_op == nullptr ||
!platform::is_gpu_place(compute_op->GetPlace()))
return var_names_in_op;
auto place = boost::get<platform::CUDAPlace>(compute_op->GetPlace());
for (VarHandleBase *var_handle_base : vars) {
auto *var_handle = dynamic_cast<VarHandle *>(var_handle_base);
if (var_handle == nullptr || !var_handle->Node()->IsVar()) continue;
if (!platform::is_gpu_place(var_handle->place_) ||
boost::get<platform::CUDAPlace>(var_handle->place_) != place)
continue;
VarDesc *var_desc = var_handle->Node()->Var(); const auto &vars = graph->Get<GraphVars>(kGraphVars);
auto var_name = var_handle->Node()->Name();
// This is weird but there is really some variables without var_desc last_live_ops_of_vars.resize(vars.size());
// in computation_op ref_cnts.resize(vars.size());
if (var_desc == nullptr) {
var_desc = compute_op->Node()->Op()->Block()->FindVar(var_name); ShrinkDepsOpFunctor shrink_func(
if (var_desc == nullptr) continue; ir::FilterByNodeWrapper<OpHandleBase>(*graph));
for (size_t i = 0; i < vars.size(); ++i) {
for (auto &name_var_pair : vars[i]) {
// Whether this variable can be reused or deleted? If not, we do not
// compute reference counts and dependencies.
VarDesc *var_desc = TryGetLatestVarDesc(name_var_pair.second);
if (var_desc == nullptr || var_desc->Persistable()) {
continue;
} }
if (var_desc->Persistable()) continue;
auto var_type = var_desc->Proto()->type().type(); auto var_type = var_desc->Proto()->type().type();
if (var_type != proto::VarType::LOD_TENSOR && if (var_type != proto::VarType::LOD_TENSOR &&
var_type != proto::VarType::SELECTED_ROWS) { var_type != proto::VarType::SELECTED_ROWS &&
var_type != proto::VarType::LOD_TENSOR_ARRAY) {
// Var type cannot be deleted
continue; continue;
} }
// compute op only runs in one device bool ok;
if (ref_cnts[place.device]->count(var_name)) auto result = ExtractComputationOpFromLastLivedVar(
++(*ref_cnts[place.device])[var_name]; name_var_pair.second.back(), i, shrink_func, &ok);
else
(*ref_cnts[place.device])[var_name] = 1;
names.insert(var_name); if (ok) {
var_names_in_op.push_back(var_name); auto &var_name = name_var_pair.first;
} PADDLE_ENFORCE(!result.empty(), "Last living ops of %s cannot be empty",
return var_names_in_op; var_name);
}; ref_cnts[i].emplace(var_name, result.size());
last_live_ops_of_vars[i].emplace(var_name, std::move(result));
auto update_ref_cnts_from_non_compute_op = [&](
OpHandleBase *op, const std::vector<VarHandleBase *> &vars) {
if (dynamic_cast<ComputationOpHandle *>(op) != nullptr) return;
for (VarHandleBase *var_handle_base : vars) {
auto *var_handle = dynamic_cast<VarHandle *>(var_handle_base);
if (var_handle == nullptr || !var_handle->Node()->IsVar()) continue;
auto var_name = var_handle->Node()->Name();
auto var_place = var_handle->place_;
if (!platform::is_gpu_place(var_place)) continue;
auto place = boost::get<platform::CUDAPlace>(var_place);
if (names.count(var_name) == 0) continue;
if (ref_cnts.count(place.device) &&
ref_cnts[place.device]->count(var_name)) {
++(*ref_cnts[place.device])[var_name];
auto *next_compute_op = FindNextComputationOpHandle(var_handle);
if (next_compute_op != nullptr) {
if (compute_ref_cnt_map.count(next_compute_op)) {
compute_ref_cnt_map[next_compute_op]->AddVar(var_name);
VLOG(5) << "Add reference count of " << var_name << " to Operator "
<< next_compute_op->Name();
} else {
// Create new reference_count_op_handle
ir::Node *ref_cnt_node = graph->CreateEmptyNode(
"reference_count", ir::Node::Type::kOperation);
auto *ref_cnt_handle = new ReferenceCountOpHandle(
ref_cnt_node, next_compute_op->GetScope(), place, {var_name},
gcs[place.device].get(), cur_ref_cnts[place.device].get());
AddDependencyBetween(next_compute_op, ref_cnt_handle, graph.get());
compute_ref_cnt_map[next_compute_op] = ref_cnt_handle;
}
}
} }
} }
};
auto all_ops = ir::FilterByNodeWrapper<OpHandleBase>(*graph);
for (auto &op : all_ops) {
auto in_var_names = get_ref_cnts_from_compute_op(op, op->Inputs());
auto out_var_names = get_ref_cnts_from_compute_op(op, op->Outputs());
if (in_var_names.empty() && out_var_names.empty()) continue;
in_var_names.insert(in_var_names.end(), out_var_names.begin(),
out_var_names.end());
auto *compute_op = dynamic_cast<ComputationOpHandle *>(op);
auto place = boost::get<platform::CUDAPlace>(compute_op->GetPlace());
ir::Node *ref_cnt_node =
graph->CreateEmptyNode("reference_count", ir::Node::Type::kOperation);
auto *ref_cnt_handle = new ReferenceCountOpHandle(
ref_cnt_node, compute_op->GetScope(), place, in_var_names,
gcs[place.device].get(), cur_ref_cnts[place.device].get());
AddDependencyBetween(compute_op, ref_cnt_handle, graph.get());
compute_ref_cnt_map[compute_op] = ref_cnt_handle;
}
for (auto &op : all_ops) {
update_ref_cnts_from_non_compute_op(op, op->Inputs());
update_ref_cnts_from_non_compute_op(op, op->Outputs());
}
std::vector<OpHandleBase *> new_all_ops;
new_all_ops.reserve(compute_ref_cnt_map.size() + all_ops.size());
for (auto &op : all_ops) {
new_all_ops.emplace_back(std::move(op));
auto it = compute_ref_cnt_map.find(new_all_ops.back());
if (it != compute_ref_cnt_map.end()) {
// Add LeafNode to ReferenceCountOpHandle
auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar());
graph->Get<GraphDepVars>(kGraphDepVars).emplace(dummy_leaf);
it->second->AddOutput(dummy_leaf);
new_all_ops.emplace_back(std::move(it->second));
}
} }
all_ops.swap(new_all_ops);
return graph; return graph;
} }
...@@ -205,5 +258,4 @@ std::unique_ptr<ir::Graph> ReferenceCountPass::ApplyImpl( ...@@ -205,5 +258,4 @@ std::unique_ptr<ir::Graph> ReferenceCountPass::ApplyImpl(
REGISTER_PASS(reference_count_pass, REGISTER_PASS(reference_count_pass,
paddle::framework::details::ReferenceCountPass) paddle::framework::details::ReferenceCountPass)
.RequirePassAttr(paddle::framework::details::kGlobalReferenceCount) .RequirePassAttr(paddle::framework::details::kGlobalReferenceCount)
.RequirePassAttr(paddle::framework::details::kCurReferenceCount) .RequirePassAttr(paddle::framework::details::kLastLiveOpsOfVars);
.RequirePassAttr(paddle::framework::details::kGarbageCollector);
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
#pragma once #pragma once
#include "paddle/fluid/framework/details/reference_count_op_handle.h"
#include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/pass.h" #include "paddle/fluid/framework/ir/pass.h"
...@@ -22,10 +21,6 @@ namespace paddle { ...@@ -22,10 +21,6 @@ namespace paddle {
namespace framework { namespace framework {
namespace details { namespace details {
constexpr char kGlobalReferenceCount[] = "reference_count";
constexpr char kCurReferenceCount[] = "current_reference_count";
constexpr char kGarbageCollector[] = "garbage_collector";
class ReferenceCountPass : public ir::Pass { class ReferenceCountPass : public ir::Pass {
protected: protected:
std::unique_ptr<ir::Graph> ApplyImpl( std::unique_ptr<ir::Graph> ApplyImpl(
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
namespace paddle {
namespace framework {
namespace details {} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <atomic>
#include <map>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "paddle/fluid/framework/garbage_collector.h"
namespace paddle {
namespace framework {
namespace details {
class ComputationOpHandle;
using ReferenceCountMap = std::unordered_map<std::string, size_t>;
using AtomicReferenceCountMap =
std::unordered_map<std::string, std::atomic<size_t>>;
using GarbageCollectorMap =
std::map<platform::Place, std::unique_ptr<GarbageCollector>>;
const char kGlobalReferenceCount[] = "global_reference_count";
const char kRuntimeReferenceCount[] = "runtime_reference_count";
const char kGarbageCollector[] = "garbage_collector";
const char kAllPlaces[] = "all_places";
using LastLiveOpsOfVars =
std::unordered_map<std::string, std::unordered_set<ComputationOpHandle*>>;
const char kLastLiveOpsOfVars[] = "last_live_ops_of_var";
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -18,9 +18,6 @@ ...@@ -18,9 +18,6 @@
#include <vector> #include <vector>
#include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/framework/variable_helper.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/framework/details/reference_count_op_handle.h"
#endif
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -69,27 +66,12 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( ...@@ -69,27 +66,12 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun", nullptr); platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun", nullptr);
drop_scope_counter_ += 1; drop_scope_counter_ += 1;
#ifdef PADDLE_WITH_CUDA
const std::string gc_name = "garbage_collector";
DeviceGarbageCollectorMap *gc =
Graph().Has(gc_name) ? &(Graph().Get<DeviceGarbageCollectorMap>(gc_name))
: nullptr;
#endif
if (!fetch_tensors.empty() || if (!fetch_tensors.empty() ||
drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) { drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) {
drop_scope_counter_ = 0; drop_scope_counter_ = 0;
// Wait All computational streams // Wait All computational streams
for (auto p : places_) { for (auto p : places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait(); platform::DeviceContextPool::Instance().Get(p)->Wait();
#ifdef PADDLE_WITH_CUDA
if (gc != nullptr && platform::is_gpu_place(p)) {
auto gpu_place = boost::get<platform::CUDAPlace>(p);
auto &gc_at_place = gc->at(gpu_place.device);
gc_at_place->Wait();
gc_at_place->Reset();
}
#endif
} }
for (auto &scope : local_scopes_) { for (auto &scope : local_scopes_) {
auto &local_scope = auto &local_scope =
......
...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and ...@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor.h"
#include <deque>
#include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/lod_rank_table.h" #include "paddle/fluid/framework/lod_rank_table.h"
...@@ -41,11 +42,43 @@ namespace { ...@@ -41,11 +42,43 @@ namespace {
int kProgramId = -1; int kProgramId = -1;
} // namespace } // namespace
static std::unordered_map<std::string, size_t> GetNonPersistableReferenceCounts(
const BlockDesc& block, const std::vector<std::string>& skip_var_list) {
std::unordered_map<std::string, size_t> ref_cnts;
std::unordered_set<std::string> skip_vars(skip_var_list.begin(),
skip_var_list.end());
auto update_ref_cnts = [&](OpDesc* op_desc, const VariableNameMap& name_map) {
for (auto& name_pair : name_map) {
for (auto& name : name_pair.second) {
if (skip_vars.count(name)) continue;
auto* var_desc = block.FindVar(name);
if (var_desc == nullptr || var_desc->Persistable()) continue;
auto type = var_desc->Proto()->type().type();
if (type != proto::VarType::LOD_TENSOR &&
type != proto::VarType::SELECTED_ROWS &&
type != proto::VarType::LOD_TENSOR_ARRAY) {
continue;
}
++ref_cnts[name];
}
}
};
for (auto op_desc : block.AllOps()) {
update_ref_cnts(op_desc, op_desc->Inputs());
update_ref_cnts(op_desc, op_desc->Outputs());
}
return ref_cnts;
}
ExecutorPrepareContext::ExecutorPrepareContext( ExecutorPrepareContext::ExecutorPrepareContext(
const framework::ProgramDesc& prog, size_t block_id) const framework::ProgramDesc& prog, size_t block_id,
const std::vector<std::string>& skip_ref_cnt_vars)
: prog_(prog), block_id_(block_id) { : prog_(prog), block_id_(block_id) {
if (GetEagerDeletionThreshold() >= 0) { if (GetEagerDeletionThreshold() >= 0) {
ref_cnts_ = GetNonPersistableReferenceCount<int>(prog_, block_id_); global_ref_cnts_ = GetNonPersistableReferenceCounts(prog.Block(block_id),
skip_ref_cnt_vars);
} }
} }
...@@ -53,28 +86,40 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { ...@@ -53,28 +86,40 @@ ExecutorPrepareContext::~ExecutorPrepareContext() {
VLOG(5) << "destroy ExecutorPrepareContext"; VLOG(5) << "destroy ExecutorPrepareContext";
} }
template <typename RefCntMap> static void DeleteUnusedTensors(
static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op, const Scope& scope, const OperatorBase* op, GarbageCollector* gc,
GarbageCollector<Tensor>* gc, std::unordered_map<std::string, size_t>* ref_cnts) {
RefCntMap* ref_cnts) { std::deque<std::shared_ptr<memory::Allocation>> garbages;
std::unordered_set<Tensor*> erase_tensors;
auto handler = [&](const VariableNameMap& name_map) { auto handler = [&](const VariableNameMap& name_map) {
for (auto& name_pair : name_map) { for (auto& name_pair : name_map) {
for (auto& name : name_pair.second) { for (auto& name : name_pair.second) {
auto it = ref_cnts->find(name); auto it = ref_cnts->find(name);
if (it == ref_cnts->end()) continue; if (it == ref_cnts->end()) continue;
if ((it->second)-- == 1) { if (--(it->second) != 0) {
auto* var = scope.FindVar(name); continue;
if (var != nullptr) { }
VLOG(10) << "Erase tensor \'" << name << "\'"; auto* var = scope.FindVar(name);
if (var->IsType<LoDTensor>()) { if (var != nullptr) {
erase_tensors.insert(var->GetMutable<LoDTensor>()); continue;
} else if (var->IsType<SelectedRows>()) { }
erase_tensors.insert(
var->GetMutable<SelectedRows>()->mutable_value()); VLOG(2) << "Erase variable " << name;
} if (var->IsType<LoDTensor>()) {
garbages.emplace_back(
var->GetMutable<LoDTensor>()->MoveMemoryHolder());
} else if (var->IsType<SelectedRows>()) {
garbages.emplace_back(var->GetMutable<SelectedRows>()
->mutable_value()
->MoveMemoryHolder());
} else if (var->IsType<LoDTensorArray>()) {
auto* lod_tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *lod_tensor_arr) {
garbages.emplace_back(t.MoveMemoryHolder());
} }
} else {
PADDLE_THROW("Type %s of %s is not supported eager deletion",
var->Type().name(), name);
} }
} }
} }
...@@ -83,8 +128,8 @@ static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op, ...@@ -83,8 +128,8 @@ static void DeleteUnusedTensors(const Scope& scope, const OperatorBase* op,
handler(op->Inputs()); handler(op->Inputs());
handler(op->Outputs()); handler(op->Outputs());
if (!erase_tensors.empty()) { if (!garbages.empty()) {
gc->Add(erase_tensors); gc->Add(std::move(garbages));
} }
} }
...@@ -325,9 +370,10 @@ void Executor::Run(const ProgramDesc& program, Scope* scope, ...@@ -325,9 +370,10 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
} }
std::unique_ptr<ExecutorPrepareContext> Executor::Prepare( std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
const ProgramDesc& program, int block_id) { const ProgramDesc& program, int block_id,
const std::vector<std::string>& skip_ref_cnt_vars) {
std::unique_ptr<ExecutorPrepareContext> ctx( std::unique_ptr<ExecutorPrepareContext> ctx(
new ExecutorPrepareContext(program, block_id)); new ExecutorPrepareContext(program, block_id, skip_ref_cnt_vars));
PADDLE_ENFORCE_LT(static_cast<size_t>(block_id), program.Size()); PADDLE_ENFORCE_LT(static_cast<size_t>(block_id), program.Size());
auto& block = program.Block(block_id); auto& block = program.Block(block_id);
for (auto& op_desc : block.AllOps()) { for (auto& op_desc : block.AllOps()) {
...@@ -338,16 +384,28 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare( ...@@ -338,16 +384,28 @@ std::unique_ptr<ExecutorPrepareContext> Executor::Prepare(
} }
std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare( std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids) { const ProgramDesc& program, const std::vector<int>& block_ids,
const std::vector<std::vector<std::string>>& skip_ref_cnt_vars) {
PADDLE_ENFORCE(
skip_ref_cnt_vars.empty() || skip_ref_cnt_vars.size() == block_ids.size(),
"skip_ref_cnt_vars should be either empty or equals to block number %d",
block_ids.size());
std::vector<std::shared_ptr<ExecutorPrepareContext>> result; std::vector<std::shared_ptr<ExecutorPrepareContext>> result;
size_t idx = 0;
for (auto& bid : block_ids) { for (auto& bid : block_ids) {
auto* ctx = new ExecutorPrepareContext(program, bid); ExecutorPrepareContext* ctx;
if (skip_ref_cnt_vars.empty()) {
ctx = new ExecutorPrepareContext(program, bid);
} else {
ctx = new ExecutorPrepareContext(program, bid, skip_ref_cnt_vars[idx]);
}
PADDLE_ENFORCE_LT(static_cast<size_t>(bid), program.Size()); PADDLE_ENFORCE_LT(static_cast<size_t>(bid), program.Size());
auto& block = program.Block(bid); auto& block = program.Block(bid);
for (auto& op_desc : block.AllOps()) { for (auto& op_desc : block.AllOps()) {
ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc)); ctx->ops_.push_back(OpRegistry::CreateOp(*op_desc));
} }
result.push_back(std::shared_ptr<ExecutorPrepareContext>(ctx)); result.push_back(std::shared_ptr<ExecutorPrepareContext>(ctx));
++idx;
} }
return result; return result;
} }
...@@ -365,22 +423,23 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, ...@@ -365,22 +423,23 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
} }
int64_t max_memory_size = GetEagerDeletionThreshold(); int64_t max_memory_size = GetEagerDeletionThreshold();
std::unique_ptr<GarbageCollector<Tensor>> gc; std::unique_ptr<GarbageCollector> gc;
// WhileOp would set keep_kids to true, // skip while_op and while_grad_op temporarily
// because WhileGradOp needs the scopes created in WhileOp.
// Perhaps, we should not perform eager deletion in WhileOp
// The scopes and variables created by WhileOp would be deleted
// in WhileGradOp.
if (max_memory_size >= 0 && !keep_kids) { if (max_memory_size >= 0 && !keep_kids) {
ctx->ResetReferenceCount(); ctx->ResetReferenceCount();
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place_)) { if (platform::is_gpu_place(place_)) {
gc.reset(new DefaultStreamGarbageCollector<Tensor>( if (IsFastEagerDeletionModeEnabled()) {
boost::get<platform::CUDAPlace>(place_), max_memory_size)); gc.reset(new UnsafeFastGPUGarbageCollector(
} else { boost::get<platform::CUDAPlace>(place_), max_memory_size));
} else {
gc.reset(new DefaultStreamGarbageCollector(
boost::get<platform::CUDAPlace>(place_), max_memory_size));
}
} else if (platform::is_cpu_place(place_)) {
#endif #endif
gc.reset(new CPUGarbageCollector<Tensor>( gc.reset(new CPUGarbageCollector(boost::get<platform::CPUPlace>(place_),
boost::get<platform::CPUPlace>(place_), max_memory_size)); max_memory_size));
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
} }
#endif #endif
...@@ -389,17 +448,13 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope, ...@@ -389,17 +448,13 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
for (auto& op : ctx->ops_) { for (auto& op : ctx->ops_) {
op->Run(*local_scope, place_); op->Run(*local_scope, place_);
if (gc != nullptr) { if (gc) {
DeleteUnusedTensors(*local_scope, op.get(), gc.get(), DeleteUnusedTensors(*local_scope, op.get(), gc.get(),
&(ctx->cur_ref_cnts_)); &(ctx->runtime_ref_cnts_));
} }
} }
if (gc != nullptr) { platform::DeviceContextPool::Instance().Get(place_)->Wait();
gc->Wait();
} else {
platform::DeviceContextPool::Instance().Get(place_)->Wait();
}
if (local_scope != scope) { if (local_scope != scope) {
scope->DeleteScope(local_scope); scope->DeleteScope(local_scope);
......
...@@ -27,52 +27,21 @@ limitations under the License. */ ...@@ -27,52 +27,21 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace framework { namespace framework {
template <typename T>
std::unordered_map<std::string, T> GetNonPersistableReferenceCount(
const ProgramDesc& prog, size_t block_id) {
auto& block = prog.Block(block_id);
std::unordered_map<std::string, T> ref_cnts;
auto update_ref_cnts = [&](OpDesc* op_desc, const VariableNameMap& name_map) {
for (auto& name_pair : name_map) {
for (auto& name : name_pair.second) {
auto* var_desc = block.FindVar(name);
if (var_desc == nullptr || var_desc->Persistable()) continue;
auto type = var_desc->Proto()->type().type();
if (type != proto::VarType::LOD_TENSOR &&
type != proto::VarType::SELECTED_ROWS) {
continue;
}
auto it = ref_cnts.find(name);
if (it != ref_cnts.end()) {
++it->second;
} else {
ref_cnts[name] = 1;
}
}
}
};
for (auto op_desc : block.AllOps()) {
update_ref_cnts(op_desc, op_desc->Inputs());
update_ref_cnts(op_desc, op_desc->Outputs());
}
return ref_cnts;
}
struct ExecutorPrepareContext { struct ExecutorPrepareContext {
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id); ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id,
const std::vector<std::string>& skip_ref_cnt_vars =
std::vector<std::string>());
~ExecutorPrepareContext(); ~ExecutorPrepareContext();
void ResetReferenceCount() { cur_ref_cnts_ = ref_cnts_; } void ResetReferenceCount() { runtime_ref_cnts_ = global_ref_cnts_; }
const framework::ProgramDesc& prog_; const framework::ProgramDesc& prog_;
size_t block_id_; size_t block_id_;
std::vector<std::unique_ptr<OperatorBase>> ops_; std::vector<std::unique_ptr<OperatorBase>> ops_;
std::unordered_map<std::string, int> ref_cnts_; std::unordered_map<std::string, size_t> global_ref_cnts_;
std::unordered_map<std::string, int> cur_ref_cnts_; std::unordered_map<std::string, size_t> runtime_ref_cnts_;
}; };
class Executor { class Executor {
...@@ -108,10 +77,14 @@ class Executor { ...@@ -108,10 +77,14 @@ class Executor {
const std::string& fetch_holder_name = "fetch"); const std::string& fetch_holder_name = "fetch");
static std::unique_ptr<ExecutorPrepareContext> Prepare( static std::unique_ptr<ExecutorPrepareContext> Prepare(
const ProgramDesc& program, int block_id); const ProgramDesc& program, int block_id,
const std::vector<std::string>& skip_ref_cnt_vars =
std::vector<std::string>());
static std::vector<std::shared_ptr<ExecutorPrepareContext>> Prepare( static std::vector<std::shared_ptr<ExecutorPrepareContext>> Prepare(
const ProgramDesc& program, const std::vector<int>& block_ids); const ProgramDesc& program, const std::vector<int>& block_ids,
const std::vector<std::vector<std::string>>& skip_ref_cnt_vars =
std::vector<std::vector<std::string>>());
void CreateVariables(const ProgramDesc& pdesc, Scope* scope, int block_id); void CreateVariables(const ProgramDesc& pdesc, Scope* scope, int block_id);
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
#include "paddle/fluid/framework/garbage_collector.h"
namespace paddle {
namespace framework {
GarbageCollector::GarbageCollector(const platform::Place &place,
size_t max_memory_size)
: max_memory_size_((std::max)(max_memory_size, static_cast<size_t>(1))) {
garbages_.reset(new GarbageQueue());
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place);
}
CPUGarbageCollector::CPUGarbageCollector(const platform::CPUPlace &place,
size_t max_memory_size)
: GarbageCollector(place, max_memory_size) {}
void CPUGarbageCollector::ClearCallback(const std::function<void()> &callback) {
callback();
}
#ifdef PADDLE_WITH_CUDA
UnsafeFastGPUGarbageCollector::UnsafeFastGPUGarbageCollector(
const platform::CUDAPlace &place, size_t max_memory_size)
: GarbageCollector(place, max_memory_size) {}
void UnsafeFastGPUGarbageCollector::ClearCallback(
const std::function<void()> &callback) {
callback();
}
DefaultStreamGarbageCollector::DefaultStreamGarbageCollector(
const platform::CUDAPlace &place, size_t max_memory_size)
: GarbageCollector(place, max_memory_size) {}
void DefaultStreamGarbageCollector::Wait() const {
static_cast<platform::CUDADeviceContext *>(this->dev_ctx_)
->WaitStreamCallback();
}
void DefaultStreamGarbageCollector::ClearCallback(
const std::function<void()> &callback) {
static_cast<platform::CUDADeviceContext *>(this->dev_ctx_)
->AddStreamCallback(callback);
}
StreamGarbageCollector::StreamGarbageCollector(const platform::CUDAPlace &place,
size_t max_memory_size)
: GarbageCollector(place, max_memory_size) {
platform::CUDADeviceGuard guard(place.device);
PADDLE_ENFORCE(cudaStreamCreate(&stream_));
callback_manager_.reset(new platform::StreamCallbackManager(stream_));
}
StreamGarbageCollector::~StreamGarbageCollector() {
auto place = boost::get<platform::CUDAPlace>(this->dev_ctx_->GetPlace());
platform::CUDADeviceGuard guard(place.device);
PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
PADDLE_ENFORCE(cudaStreamDestroy(stream_));
}
cudaStream_t StreamGarbageCollector::stream() const { return stream_; }
void StreamGarbageCollector::Wait() const { callback_manager_->Wait(); }
void StreamGarbageCollector::ClearCallback(
const std::function<void()> &callback) {
callback_manager_->AddCallback(callback);
}
#endif
} // namespace framework
} // namespace paddle
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
#pragma once #pragma once
#include <algorithm>
#include <deque> #include <deque>
#include <functional> #include <functional>
#include <memory> #include <memory>
...@@ -24,134 +23,74 @@ ...@@ -24,134 +23,74 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
// T should have memory_size() and clear() method
template <typename T>
class GarbageCollector { class GarbageCollector {
public: public:
GarbageCollector(const platform::Place &place, size_t max_memory_size) using GarbageQueue = std::deque<std::shared_ptr<memory::Allocation>>;
: max_memory_size_((std::max)(max_memory_size, static_cast<size_t>(1))) {
garbages_.reset(new std::deque<T *>());
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place);
}
virtual ~GarbageCollector() {} GarbageCollector(const platform::Place &place, size_t max_memory_size);
void Reset() { virtual ~GarbageCollector() = default;
std::lock_guard<std::mutex> guard(mutex_);
garbages_.reset(new std::deque<T *>()); virtual void Wait() const {}
cur_memory_size_ = 0;
}
template <typename Container> template <typename Container>
void Add(const Container &objs) { void Add(Container &&objs);
Add(objs, []() {});
}
template <typename Container, typename Callback> template <typename Container, typename Callback>
void Add(const Container &objs, Callback &&callback) { void Add(Container &&objs, Callback &&callback);
std::shared_ptr<std::deque<T *>> clear_deque;
{
std::lock_guard<std::mutex> guard(mutex_);
for (auto *obj : objs) {
garbages_->push_back(obj);
cur_memory_size_ += obj->memory_size();
}
if (cur_memory_size_ >= max_memory_size_) {
cur_memory_size_ = 0;
clear_deque = garbages_;
garbages_.reset(new std::deque<T *>());
}
}
if (clear_deque != nullptr) {
callback();
ClearCallback([=]() {
for (auto *obj : *clear_deque) obj->clear();
});
}
}
virtual void Wait() const {}
protected: protected:
virtual void ClearCallback(const std::function<void()> &callback) = 0; virtual void ClearCallback(const std::function<void()> &callback) = 0;
platform::DeviceContext *dev_ctx_; platform::DeviceContext *dev_ctx_;
std::shared_ptr<std::deque<T *>> garbages_; std::unique_ptr<GarbageQueue> garbages_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
const size_t max_memory_size_; const size_t max_memory_size_;
size_t cur_memory_size_ = 0; size_t cur_memory_size_{0};
}; };
template <typename T> class CPUGarbageCollector : public GarbageCollector {
class CPUGarbageCollector : public GarbageCollector<T> {
public: public:
CPUGarbageCollector(const platform::CPUPlace &place, size_t max_memory_size) CPUGarbageCollector(const platform::CPUPlace &place, size_t max_memory_size);
: GarbageCollector<T>(place, max_memory_size) {}
protected: protected:
void ClearCallback(const std::function<void()> &callback) override { void ClearCallback(const std::function<void()> &callback) override;
callback();
}
}; };
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
template <typename T> class UnsafeFastGPUGarbageCollector : public GarbageCollector {
class DefaultStreamGarbageCollector : public GarbageCollector<T> {
public: public:
DefaultStreamGarbageCollector(const platform::CUDAPlace &place, UnsafeFastGPUGarbageCollector(const platform::CUDAPlace &place,
size_t max_memory_size) size_t max_memory_size);
: GarbageCollector<T>(place, max_memory_size) {}
cudaStream_t stream() const { protected:
return static_cast<const platform::CUDADeviceContext *>(this->dev_ctx_) void ClearCallback(const std::function<void()> &callback) override;
->stream(); };
}
void Wait() const override { class DefaultStreamGarbageCollector : public GarbageCollector {
this->dev_ctx_->Wait(); public:
static_cast<const platform::CUDADeviceContext *>(this->dev_ctx_) DefaultStreamGarbageCollector(const platform::CUDAPlace &place,
->WaitStreamCallback(); size_t max_memory_size);
}
void Wait() const override;
protected: protected:
void ClearCallback(const std::function<void()> &callback) override { void ClearCallback(const std::function<void()> &callback) override;
static_cast<platform::CUDADeviceContext *>(this->dev_ctx_)
->AddStreamCallback(callback);
}
}; };
template <typename T> class StreamGarbageCollector : public GarbageCollector {
class StreamGarbageCollector : public GarbageCollector<T> {
public: public:
StreamGarbageCollector(const platform::CUDAPlace &place, StreamGarbageCollector(const platform::CUDAPlace &place,
size_t max_memory_size) size_t max_memory_size);
: GarbageCollector<T>(place, max_memory_size) {
PADDLE_ENFORCE(cudaSetDevice(place.device));
PADDLE_ENFORCE(cudaStreamCreate(&stream_));
callback_manager_.reset(new platform::StreamCallbackManager(stream_));
}
~StreamGarbageCollector() { ~StreamGarbageCollector();
auto place = boost::get<platform::CUDAPlace>(this->dev_ctx_->GetPlace());
PADDLE_ENFORCE(cudaSetDevice(place.device));
PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
PADDLE_ENFORCE(cudaStreamDestroy(stream_));
}
void Wait() const override { void Wait() const override;
PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
std::lock_guard<std::mutex> guard(this->mutex_);
callback_manager_->Wait();
}
cudaStream_t stream() const { return stream_; } cudaStream_t stream() const;
protected: protected:
void ClearCallback(const std::function<void()> &callback) override { void ClearCallback(const std::function<void()> &callback) override;
std::lock_guard<std::mutex> guard(this->mutex_);
callback_manager_->AddCallback(callback);
}
private: private:
cudaStream_t stream_; cudaStream_t stream_;
...@@ -159,5 +98,33 @@ class StreamGarbageCollector : public GarbageCollector<T> { ...@@ -159,5 +98,33 @@ class StreamGarbageCollector : public GarbageCollector<T> {
}; };
#endif #endif
template <typename Container>
void GarbageCollector::Add(Container &&objs) {
Add(std::forward<Container>(objs), []() {});
}
template <typename Container, typename Callback>
void GarbageCollector::Add(Container &&objs, Callback &&callback) {
GarbageQueue *garbage_queue = nullptr;
{
std::lock_guard<std::mutex> guard(mutex_);
for (auto &obj : objs) {
if (!obj) continue;
cur_memory_size_ += obj->size();
garbages_->push_back(std::move(obj));
}
if (cur_memory_size_ >= max_memory_size_) {
cur_memory_size_ = 0;
garbage_queue = garbages_.release();
garbages_.reset(new GarbageQueue());
}
}
if (garbage_queue) {
callback();
ClearCallback([garbage_queue]() { delete garbage_queue; });
}
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -73,14 +73,21 @@ class Graph { ...@@ -73,14 +73,21 @@ class Graph {
} }
bool Has(const std::string &attr_name) const { bool Has(const std::string &attr_name) const {
return attrs_.find(attr_name) != attrs_.end(); return attrs_.count(attr_name) > 0;
} }
template <typename AttrType> template <typename AttrType>
AttrType &Get(const std::string &attr_name) const { AttrType &Get(const std::string &attr_name) const {
PADDLE_ENFORCE(Has(attr_name), "%s attr not registered for graph.", PADDLE_ENFORCE(Has(attr_name), "%s attr not registered for graph.",
attr_name); attr_name);
return *boost::any_cast<AttrType *>(attrs_.at(attr_name)); try {
return *boost::any_cast<AttrType *>(attrs_.at(attr_name));
} catch (boost::bad_any_cast &) {
PADDLE_THROW(
"Invalid attribute type of %s error, expected: %s, actual: %s",
attr_name, typeid(AttrType *).name(),
attrs_.at(attr_name).type().name());
}
} }
template <typename AttrType> template <typename AttrType>
......
...@@ -51,11 +51,18 @@ class Pass { ...@@ -51,11 +51,18 @@ class Pass {
AttrType &Get(const std::string &attr_name) const { AttrType &Get(const std::string &attr_name) const {
PADDLE_ENFORCE(attrs_.find(attr_name) != attrs_.end(), PADDLE_ENFORCE(attrs_.find(attr_name) != attrs_.end(),
"%s attr not registered for pass.", attr_name); "%s attr not registered for pass.", attr_name);
return *boost::any_cast<AttrType *>(attrs_.at(attr_name)); try {
return *boost::any_cast<AttrType *>(attrs_.at(attr_name));
} catch (boost::bad_any_cast &) {
PADDLE_THROW(
"Invalid attribute type of %s error, expected: %s, actual: %s",
attr_name, typeid(AttrType *).name(),
attrs_.at(attr_name).type().name());
}
} }
bool Has(const std::string &attr_name) const { bool Has(const std::string &attr_name) const {
return attrs_.find(attr_name) != attrs_.end(); return attrs_.count(attr_name) > 0;
} }
void Erase(const std::string &attr_name) { void Erase(const std::string &attr_name) {
......
...@@ -879,6 +879,8 @@ proto::VarType::Type OperatorWithKernel::IndicateDataType( ...@@ -879,6 +879,8 @@ proto::VarType::Type OperatorWithKernel::IndicateDataType(
t = &(var->Get<SelectedRows>().value()); t = &(var->Get<SelectedRows>().value());
} }
if (t != nullptr) { if (t != nullptr) {
PADDLE_ENFORCE(t->IsInitialized(), "Input %s is not initialized: %s",
ipt_name, DebugString());
int tmp = static_cast<int>(ToDataType(t->type())); int tmp = static_cast<int>(ToDataType(t->type()));
PADDLE_ENFORCE( PADDLE_ENFORCE(
tmp == data_type || data_type == -1, tmp == data_type || data_type == -1,
......
...@@ -26,6 +26,7 @@ limitations under the License. */ ...@@ -26,6 +26,7 @@ limitations under the License. */
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/reference_count_pass_helper.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
...@@ -72,6 +73,26 @@ class ParallelExecutorPrivate { ...@@ -72,6 +73,26 @@ class ParallelExecutorPrivate {
} }
} }
} }
std::unique_ptr<ir::Graph> PrepareGCAndRefCnts(
std::unique_ptr<ir::Graph> graph, size_t max_memory_size);
inline bool HasGarbageCollectors() const { return !gcs_.empty(); }
void ResetRuntimeReferenceCount(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
for (size_t i = 0; i < runtime_ref_cnts_.size(); ++i) {
for (auto &pair : global_ref_cnts_[i]) {
runtime_ref_cnts_[i][pair.first] = pair.second;
}
for (auto &fetch_name : fetch_tensors) {
runtime_ref_cnts_[i].erase(fetch_name);
}
runtime_ref_cnts_[i].erase(fetched_var_name);
}
}
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
Scope *global_scope_; // not owned Scope *global_scope_; // not owned
...@@ -83,8 +104,76 @@ class ParallelExecutorPrivate { ...@@ -83,8 +104,76 @@ class ParallelExecutorPrivate {
bool own_local_scope_; bool own_local_scope_;
bool use_cuda_; bool use_cuda_;
bool use_all_reduce_; bool use_all_reduce_;
// global_ref_cnts_ is only initialized when ParallelExecutor constructs, and
// then keeps unchanged
// Before each iteration, runtime_ref_cnts_ is reset to global_ref_cnts_
std::vector<details::ReferenceCountMap> global_ref_cnts_;
std::vector<details::AtomicReferenceCountMap> runtime_ref_cnts_;
details::GarbageCollectorMap gcs_;
}; };
std::unique_ptr<ir::Graph> ParallelExecutorPrivate::PrepareGCAndRefCnts(
std::unique_ptr<ir::Graph> graph, size_t max_memory_size) {
for (size_t i = 0; i < places_.size(); ++i) {
auto &place = places_[i];
if (gcs_.count(place) > 0) {
continue;
}
std::unique_ptr<GarbageCollector> gc;
#ifdef PADDLE_WITH_CUDA
if (platform::is_gpu_place(place)) {
if (IsFastEagerDeletionModeEnabled()) {
gc.reset(new UnsafeFastGPUGarbageCollector(
boost::get<platform::CUDAPlace>(place), max_memory_size));
} else {
gc.reset(new StreamGarbageCollector(
boost::get<platform::CUDAPlace>(place), max_memory_size));
}
VLOG(10) << "Created " << i << "-th GarbageCollector at " << place;
} else {
#endif
if (platform::is_cpu_place(place)) {
gc.reset(new CPUGarbageCollector(boost::get<platform::CPUPlace>(place),
max_memory_size));
VLOG(10) << "Created GarbageCollector at " << place;
} else {
PADDLE_THROW("Unsupported place for garbage collection");
}
#ifdef PADDLE_WITH_CUDA
}
#endif
gcs_.emplace(place, std::move(gc));
}
if (!gcs_.empty()) {
std::vector<details::LastLiveOpsOfVars> last_live_ops_of_vars;
auto ref_cnt_pass =
ir::PassRegistry::Instance().Get("reference_count_pass");
ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount,
&global_ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kLastLiveOpsOfVars,
&last_live_ops_of_vars);
graph = ref_cnt_pass->Apply(std::move(graph));
VLOG(10) << "ReferenceCountPass Applied";
auto eager_deletion_pass =
ir::PassRegistry::Instance().Get("eager_deletion_pass");
eager_deletion_pass->SetNotOwned(details::kRuntimeReferenceCount,
&runtime_ref_cnts_);
eager_deletion_pass->SetNotOwned(details::kGarbageCollector, &gcs_);
eager_deletion_pass->SetNotOwned(details::kLastLiveOpsOfVars,
&last_live_ops_of_vars);
eager_deletion_pass->SetNotOwned(details::kAllPlaces, &places_);
graph = eager_deletion_pass->Apply(std::move(graph));
VLOG(10) << "EagerDeletionPass Applied";
}
return graph;
}
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() { std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
return member_->local_scopes_; return member_->local_scopes_;
} }
...@@ -151,36 +240,18 @@ ParallelExecutor::ParallelExecutor( ...@@ -151,36 +240,18 @@ ParallelExecutor::ParallelExecutor(
std::unique_ptr<ir::Graph> graph = build_strategy.Apply( std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, params, main_program, member_->places_, loss_var_name, params,
member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get());
auto max_memory_size = GetEagerDeletionThreshold();
if (max_memory_size >= 0) {
for (auto &place : member_->places_) {
if (!platform::is_gpu_place(place)) continue;
auto gpu_place = boost::get<platform::CUDAPlace>(place);
if (gcs_[gpu_place.device] == nullptr) {
ref_cnts_[gpu_place.device].reset(new details::ReferenceCountMap());
cur_ref_cnts_[gpu_place.device].reset(
new details::AtomicReferenceCountMap());
gcs_[gpu_place.device].reset(
new StreamGarbageCollector<Tensor>(gpu_place, max_memory_size));
}
}
if (!gcs_.empty()) {
auto ref_cnt_pass =
ir::PassRegistry::Instance().Get("reference_count_pass");
ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_);
graph = ref_cnt_pass->Apply(std::move(graph));
graph->SetNotOwned("garbage_collector", &gcs_);
}
}
#else #else
std::unique_ptr<ir::Graph> graph = std::unique_ptr<ir::Graph> graph =
build_strategy.Apply(main_program, member_->places_, loss_var_name, build_strategy.Apply(main_program, member_->places_, loss_var_name,
params, member_->local_scopes_, member_->use_cuda_); params, member_->local_scopes_, member_->use_cuda_);
#endif #endif
auto max_memory_size = GetEagerDeletionThreshold();
if (max_memory_size >= 0) {
graph = member_->PrepareGCAndRefCnts(std::move(graph),
static_cast<size_t>(max_memory_size));
}
// Step 3. Create vars in each scope. Passes may also create new vars. // Step 3. Create vars in each scope. Passes may also create new vars.
// skip control vars and empty vars // skip control vars and empty vars
std::vector<details::VariableInfo> var_infos; std::vector<details::VariableInfo> var_infos;
...@@ -300,18 +371,9 @@ void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, ...@@ -300,18 +371,9 @@ void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
#endif #endif
platform::RecordBlock b(0); platform::RecordBlock b(0);
#ifdef PADDLE_WITH_CUDA if (member_->HasGarbageCollectors()) {
if (!gcs_.empty()) { member_->ResetRuntimeReferenceCount(fetch_tensors, fetched_var_name);
ResetReferenceCount();
for (auto &pair : cur_ref_cnts_) {
auto &name_map = *(pair.second);
for (auto &fetch_name : fetch_tensors) {
name_map.erase(fetch_name);
}
name_map.erase(fetched_var_name);
}
} }
#endif
auto fetch_data = member_->executor_->Run(fetch_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() = *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data; fetch_data;
...@@ -355,13 +417,11 @@ ParallelExecutor::~ParallelExecutor() { ...@@ -355,13 +417,11 @@ ParallelExecutor::~ParallelExecutor() {
for (auto &p : member_->places_) { for (auto &p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait(); platform::DeviceContextPool::Instance().Get(p)->Wait();
} }
// member_ must be destructed before gcs_ since the destructor of delete member_;
// ReferenceCountOpHandle use raw pointers of gcs_ inside.
member_.reset();
} }
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
#ifdef PADDLE_WITH_CUDA
USE_PASS(reference_count_pass); USE_PASS(reference_count_pass);
#endif USE_PASS(eager_deletion_pass);
...@@ -14,7 +14,6 @@ limitations under the License. */ ...@@ -14,7 +14,6 @@ limitations under the License. */
#pragma once #pragma once
#include <atomic>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
...@@ -29,10 +28,6 @@ limitations under the License. */ ...@@ -29,10 +28,6 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/framework/details/reference_count_pass.h"
#endif
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -75,24 +70,7 @@ class ParallelExecutor { ...@@ -75,24 +70,7 @@ class ParallelExecutor {
private: private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const; void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
std::unique_ptr<ParallelExecutorPrivate> member_; ParallelExecutorPrivate *member_;
#ifdef PADDLE_WITH_CUDA
// ref_cnts_ is only initialized when ParallelExecutor constructs, and then
// keeps unchanged
// Before each iteration, cur_ref_cnts_ is reset to ref_cnts_
details::DeviceReferenceCountMap ref_cnts_;
details::AtomicDeviceReferenceCountMap cur_ref_cnts_;
details::DeviceGarbageCollectorMap gcs_;
void ResetReferenceCount() {
for (auto &pair1 : ref_cnts_) {
for (auto &pair2 : *(pair1.second)) {
(*(cur_ref_cnts_[pair1.first]))[pair2.first] = pair2.second;
}
}
}
#endif
}; };
} // namespace framework } // namespace framework
......
...@@ -38,6 +38,10 @@ DEFINE_double( ...@@ -38,6 +38,10 @@ DEFINE_double(
"Memory size threshold (GB) when the garbage collector clear tensors." "Memory size threshold (GB) when the garbage collector clear tensors."
"Disabled when this value is less than 0"); "Disabled when this value is less than 0");
DEFINE_bool(fast_eager_deletion_mode, false,
"Fast eager deletion mode. If enabled, memory would release "
"immediately without waiting GPU kernel ends.");
// When in inference scenario, the scopes will not be written by two threads in // When in inference scenario, the scopes will not be written by two threads in
// a mean time, but a scope may be read by multiple threads concurrently, and // a mean time, but a scope may be read by multiple threads concurrently, and
// the mutex will cause serious performance issue. // the mutex will cause serious performance issue.
...@@ -58,6 +62,8 @@ int64_t GetEagerDeletionThreshold() { ...@@ -58,6 +62,8 @@ int64_t GetEagerDeletionThreshold() {
(static_cast<int64_t>(1) << 30)); (static_cast<int64_t>(1) << 30));
} }
bool IsFastEagerDeletionModeEnabled() { return FLAGS_fast_eager_deletion_mode; }
Scope::~Scope() { DropKids(); } Scope::~Scope() { DropKids(); }
Scope& Scope::NewScope() const { Scope& Scope::NewScope() const {
......
...@@ -27,6 +27,7 @@ namespace paddle { ...@@ -27,6 +27,7 @@ namespace paddle {
namespace framework { namespace framework {
int64_t GetEagerDeletionThreshold(); int64_t GetEagerDeletionThreshold();
bool IsFastEagerDeletionModeEnabled();
class Scope; class Scope;
......
...@@ -158,6 +158,10 @@ class Tensor { ...@@ -158,6 +158,10 @@ class Tensor {
const std::shared_ptr<memory::Allocation>& Holder() const { return holder_; } const std::shared_ptr<memory::Allocation>& Holder() const { return holder_; }
size_t offset() const { return offset_; } size_t offset() const { return offset_; }
std::shared_ptr<memory::Allocation> MoveMemoryHolder() {
return std::move(holder_);
}
private: private:
/*! holds the memory block if allocated. */ /*! holds the memory block if allocated. */
std::shared_ptr<memory::Allocation> holder_; std::shared_ptr<memory::Allocation> holder_;
......
...@@ -32,6 +32,20 @@ static constexpr char kStepScopes[] = "StepScopes"; ...@@ -32,6 +32,20 @@ static constexpr char kStepScopes[] = "StepScopes";
static constexpr char kX[] = "X"; static constexpr char kX[] = "X";
static constexpr char kXGRAD[] = "X@GRAD"; static constexpr char kXGRAD[] = "X@GRAD";
static constexpr char kOutputs[] = "Out"; static constexpr char kOutputs[] = "Out";
static constexpr char kSkipEagerDeletionVars[] = "skip_eager_deletion_vars";
namespace { // NOLINT
static std::string GetSkipEagerDeletionVarsDebugString(
const std::vector<std::string> &vars) {
std::string str = "Skip " + std::to_string(vars.size()) +
" var(s) in eager deletion mode: ";
for (auto &var : vars) {
str.append(var);
str.push_back(' ');
}
return str;
}
} // NOLINT
class WhileOp : public framework::OperatorBase { class WhileOp : public framework::OperatorBase {
public: public:
...@@ -59,7 +73,10 @@ class WhileOp : public framework::OperatorBase { ...@@ -59,7 +73,10 @@ class WhileOp : public framework::OperatorBase {
"Condition of while op must in CPU memory."); "Condition of while op must in CPU memory.");
bool is_test = Attr<bool>("is_test"); bool is_test = Attr<bool>("is_test");
auto ctx = executor.Prepare(*program, block->ID()); auto &skip_vars = Attr<std::vector<std::string>>(kSkipEagerDeletionVars);
VLOG(2) << GetSkipEagerDeletionVarsDebugString(skip_vars);
auto ctx = executor.Prepare(*program, block->ID(), skip_vars);
while (cond.data<bool>()[0]) { while (cond.data<bool>()[0]) {
auto &current_scope = scope.NewScope(); auto &current_scope = scope.NewScope();
step_scopes->push_back(&current_scope); step_scopes->push_back(&current_scope);
...@@ -96,6 +113,10 @@ class WhileOpMaker : public framework::OpProtoAndCheckerMaker { ...@@ -96,6 +113,10 @@ class WhileOpMaker : public framework::OpProtoAndCheckerMaker {
"(bool, default false) Set to true for inference only, false " "(bool, default false) Set to true for inference only, false "
"for training. Some layers may run faster when this is true.") "for training. Some layers may run faster when this is true.")
.SetDefault(false); .SetDefault(false);
AddAttr<std::vector<std::string>>(kSkipEagerDeletionVars,
"Vars that would skip eager deletion."
"Users should not set this manually.")
.SetDefault(std::vector<std::string>());
AddComment(R"DOC( AddComment(R"DOC(
)DOC"); )DOC");
} }
...@@ -119,7 +140,10 @@ class WhileGradOp : public framework::OperatorBase { ...@@ -119,7 +140,10 @@ class WhileGradOp : public framework::OperatorBase {
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
auto *block = Attr<framework::BlockDesc *>(kStepBlock); auto *block = Attr<framework::BlockDesc *>(kStepBlock);
auto *program = block->Program(); auto *program = block->Program();
auto ctx = executor.Prepare(*program, block->ID());
auto &skip_vars = Attr<std::vector<std::string>>(kSkipEagerDeletionVars);
VLOG(2) << GetSkipEagerDeletionVarsDebugString(skip_vars);
auto ctx = executor.Prepare(*program, block->ID(), skip_vars);
auto *step_scopes = auto *step_scopes =
scope.FindVar(Input(kStepScopes))->GetMutable<StepScopeVar>(); scope.FindVar(Input(kStepScopes))->GetMutable<StepScopeVar>();
...@@ -341,6 +365,8 @@ class WhileGradOpDescMaker : public framework::SingleGradOpDescMaker { ...@@ -341,6 +365,8 @@ class WhileGradOpDescMaker : public framework::SingleGradOpDescMaker {
// while operator could be renamed. // while operator could be renamed.
while_grad->SetAttr("original_output_grad", output_grads_list); while_grad->SetAttr("original_output_grad", output_grads_list);
while_grad->SetAttr(kSkipEagerDeletionVars, std::vector<std::string>());
return std::unique_ptr<framework::OpDesc>(while_grad); return std::unique_ptr<framework::OpDesc>(while_grad);
} }
}; };
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <algorithm>
#include <chrono> // NOLINT #include <chrono> // NOLINT
#include <cstdlib> #include <cstdlib>
#include <fstream> #include <fstream>
...@@ -55,8 +56,7 @@ class CTRReader : public framework::FileReader { ...@@ -55,8 +56,7 @@ class CTRReader : public framework::FileReader {
PADDLE_ENFORCE_GT(thread_num, 0, "thread num should be larger then 0!"); PADDLE_ENFORCE_GT(thread_num, 0, "thread num should be larger then 0!");
PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null"); PADDLE_ENFORCE(queue != nullptr, "LoDTensorBlockingQueue must not be null");
PADDLE_ENFORCE_GT(file_list.size(), 0, "file list should not be empty"); PADDLE_ENFORCE_GT(file_list.size(), 0, "file list should not be empty");
thread_num_ = thread_num_ = std::min<size_t>(file_list_.size(), thread_num);
file_list_.size() > thread_num ? thread_num : file_list_.size();
queue_ = queue; queue_ = queue;
SplitFiles(); SplitFiles();
for (size_t i = 0; i < thread_num_; ++i) { for (size_t i = 0; i < thread_num_; ++i) {
...@@ -95,10 +95,10 @@ class CTRReader : public framework::FileReader { ...@@ -95,10 +95,10 @@ class CTRReader : public framework::FileReader {
queue_->ReOpen(); queue_->ReOpen();
VLOG(3) << "reopen success"; VLOG(3) << "reopen success";
VLOG(3) << "thread_num " << thread_num_; VLOG(3) << "thread_num " << thread_num_;
for (int thread_id = 0; thread_id < thread_num_; thread_id++) { for (size_t thread_id = 0; thread_id < thread_num_; thread_id++) {
read_threads_.emplace_back(new std::thread( read_threads_.emplace_back(new std::thread(std::bind(
std::bind(&ReadThread, file_groups_[thread_id], slots_, batch_size_, &ReadThread, file_groups_[thread_id], slots_, batch_size_,
thread_id, &read_thread_status_, queue_))); static_cast<int>(thread_id), &read_thread_status_, queue_)));
} }
monitor_thread_.reset(new std::thread( monitor_thread_.reset(new std::thread(
std::bind(&MonitorThread, &read_thread_status_, queue_))); std::bind(&MonitorThread, &read_thread_status_, queue_)));
......
...@@ -56,9 +56,16 @@ ELSE() ...@@ -56,9 +56,16 @@ ELSE()
set(MKLDNN_CTX_DEPS) set(MKLDNN_CTX_DEPS)
ENDIF() ENDIF()
nv_library(stream_callback_manager SRCS stream_callback_manager.cc DEPS simple_threadpool enforce)
IF(WITH_GPU)
set(STREAM_CALLBACK_DEPS stream_callback_manager)
ELSE()
set(STREAM_CALLBACK_DEPS)
ENDIF()
# memcpy depends on device_context, here add deps individually for # memcpy depends on device_context, here add deps individually for
# avoiding cycle dependencies # avoiding cycle dependencies
cc_library(device_context SRCS device_context.cc init.cc DEPS simple_threadpool malloc cc_library(device_context SRCS device_context.cc init.cc DEPS simple_threadpool malloc ${STREAM_CALLBACK_DEPS}
place eigen3 stringpiece cpu_helper cpu_info framework_proto ${GPU_CTX_DEPS} ${MKLDNN_CTX_DEPS}) place eigen3 stringpiece cpu_helper cpu_info framework_proto ${GPU_CTX_DEPS} ${MKLDNN_CTX_DEPS})
nv_test(device_context_test SRCS device_context_test.cu DEPS device_context gpu_info) nv_test(device_context_test SRCS device_context_test.cu DEPS device_context gpu_info)
......
...@@ -222,14 +222,10 @@ class CUDADeviceContext : public DeviceContext { ...@@ -222,14 +222,10 @@ class CUDADeviceContext : public DeviceContext {
template <typename Callback> template <typename Callback>
void AddStreamCallback(Callback&& callback) const { void AddStreamCallback(Callback&& callback) const {
std::lock_guard<std::mutex> guard(callback_mtx_);
callback_manager_->AddCallback(callback); callback_manager_->AddCallback(callback);
} }
void WaitStreamCallback() const { void WaitStreamCallback() const { callback_manager_->Wait(); }
std::lock_guard<std::mutex> guard(callback_mtx_);
callback_manager_->Wait();
}
#if CUDA_VERSION >= 9000 #if CUDA_VERSION >= 9000
/*! \brief CublasCall may need to change cublas's config, /*! \brief CublasCall may need to change cublas's config,
...@@ -260,9 +256,7 @@ class CUDADeviceContext : public DeviceContext { ...@@ -260,9 +256,7 @@ class CUDADeviceContext : public DeviceContext {
mutable std::mutex mtx_; mutable std::mutex mtx_;
// This lock is only used by callback // StreamCallbackManager is thread-safe
// If we use mtx_ for StreamCallbackManager, deadlock may occur sometimes
mutable std::mutex callback_mtx_;
std::unique_ptr<StreamCallbackManager> callback_manager_; std::unique_ptr<StreamCallbackManager> callback_manager_;
mutable std::mutex cublas_mtx_; mutable std::mutex cublas_mtx_;
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/platform/stream_callback_manager.h"
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace platform {
#if CUDA_VERSION >= 10000
static void CUDART_CB StreamCallbackFunc(void *user_data);
#else
static void CUDART_CB StreamCallbackFunc(cudaStream_t stream,
cudaError_t status, void *user_data)
#endif
{
std::unique_ptr<std::function<void()>> func(
reinterpret_cast<std::function<void()> *>(user_data));
(*func)();
}
StreamCallbackManager::StreamCallbackManager(const cudaStream_t stream)
: stream_(stream), thread_pool_(1) {}
void StreamCallbackManager::AddCallback(std::function<void()> callback) const {
auto *callback_func = new std::function<void()>(std::move(callback));
auto *func = new std::function<void()>([this, callback_func] {
std::lock_guard<std::mutex> lock(mtx_);
last_future_ = thread_pool_.enqueue([callback_func] {
std::unique_ptr<std::function<void()>> releaser(callback_func);
(*callback_func)();
});
});
#if CUDA_VERSION >= 10000
PADDLE_ENFORCE(cudaLaunchHostFunc(stream_, StreamCallbackFunc, func));
#else
PADDLE_ENFORCE(cudaStreamAddCallback(stream_, StreamCallbackFunc, func, 0));
#endif
}
void StreamCallbackManager::Wait() const {
PADDLE_ENFORCE(cudaStreamSynchronize(stream_));
{
std::lock_guard<std::mutex> lock(mtx_);
if (last_future_.valid()) {
last_future_.wait();
}
}
}
} // namespace platform
} // namespace paddle
...@@ -18,67 +18,32 @@ ...@@ -18,67 +18,32 @@
#include <cuda.h> #include <cuda.h>
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <functional> #include <functional>
#include <future> // NOLINT
#include <memory> #include <memory>
#include <mutex> // NOLINT
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
namespace paddle { namespace paddle {
namespace platform { namespace platform {
class StreamCallbackManager; // NOTE(zjl): clean StreamCallbackManager to make compilation faster
// Make StreamCallbackManager thread-safe
struct StreamCallbackContext {
template <typename Callback>
inline StreamCallbackContext(const StreamCallbackManager *manager,
Callback &&callback)
: manager_(manager), callback_(callback) {}
const StreamCallbackManager *manager_; // do not own
std::function<void()> callback_;
};
class StreamCallbackManager { class StreamCallbackManager {
public: public:
explicit inline StreamCallbackManager(cudaStream_t stream = nullptr) explicit StreamCallbackManager(const cudaStream_t stream);
: stream_(stream), thread_pool_(new ThreadPool(1)) {}
~StreamCallbackManager() = default;
template <typename Callback> void AddCallback(std::function<void()> callback) const;
inline void AddCallback(Callback &&callback) const {
auto *stream_callback_context =
new StreamCallbackContext(this, std::forward<Callback>(callback));
#if CUDA_VERSION >= 10000
PADDLE_ENFORCE(cudaLaunchHostFunc(stream_,
StreamCallbackManager::StreamCallbackFunc,
stream_callback_context)); // NOLINT
#else
PADDLE_ENFORCE(cudaStreamAddCallback(
stream_, StreamCallbackManager::StreamCallbackFunc,
stream_callback_context, 0)); // NOLINT
#endif
}
void Wait() const { thread_pool_.reset(new ThreadPool(1)); } void Wait() const;
private: private:
const cudaStream_t stream_; const cudaStream_t stream_;
mutable std::unique_ptr<ThreadPool> thread_pool_; mutable ::ThreadPool thread_pool_;
mutable std::mutex mtx_;
// cudaStreamCallback cannot call CUDA API inside, so we have to use mutable std::future<void> last_future_;
// thread_pool here
#if CUDA_VERSION >= 10000
static void CUDART_CB StreamCallbackFunc(void *user_data)
#else
static void CUDART_CB StreamCallbackFunc(cudaStream_t stream,
cudaError_t status, void *user_data)
#endif
{
auto *callback_context_ptr =
reinterpret_cast<StreamCallbackContext *>(user_data);
callback_context_ptr->manager_->thread_pool_->enqueue([=]() {
std::unique_ptr<StreamCallbackContext> callback_context(
callback_context_ptr);
callback_context->callback_();
});
}
}; };
} // namespace platform } // namespace platform
......
...@@ -162,7 +162,7 @@ void PyCPUTensorSetFromArray( ...@@ -162,7 +162,7 @@ void PyCPUTensorSetFromArray(
paddle::platform::CPUPlace place) { paddle::platform::CPUPlace place) {
std::vector<int64_t> dims; std::vector<int64_t> dims;
dims.reserve(array.ndim()); dims.reserve(array.ndim());
for (size_t i = 0; i < array.ndim(); ++i) { for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) {
dims.push_back(static_cast<int>(array.shape()[i])); dims.push_back(static_cast<int>(array.shape()[i]));
} }
...@@ -182,7 +182,7 @@ inline void PyCPUTensorSetFromArray( ...@@ -182,7 +182,7 @@ inline void PyCPUTensorSetFromArray(
paddle::platform::CPUPlace place) { paddle::platform::CPUPlace place) {
std::vector<int64_t> dims; std::vector<int64_t> dims;
dims.reserve(array.ndim()); dims.reserve(array.ndim());
for (int i = 0; i < array.ndim(); ++i) { for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) {
dims.push_back(static_cast<int>(array.shape()[i])); dims.push_back(static_cast<int>(array.shape()[i]));
} }
...@@ -200,7 +200,7 @@ void PyCUDATensorSetFromArray( ...@@ -200,7 +200,7 @@ void PyCUDATensorSetFromArray(
paddle::platform::CUDAPlace place) { paddle::platform::CUDAPlace place) {
std::vector<int64_t> dims; std::vector<int64_t> dims;
dims.reserve(array.ndim()); dims.reserve(array.ndim());
for (size_t i = 0; i < array.ndim(); ++i) { for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) {
dims.push_back(static_cast<int>(array.shape()[i])); dims.push_back(static_cast<int>(array.shape()[i]));
} }
...@@ -221,7 +221,7 @@ inline void PyCUDATensorSetFromArray( ...@@ -221,7 +221,7 @@ inline void PyCUDATensorSetFromArray(
paddle::platform::CUDAPlace place) { paddle::platform::CUDAPlace place) {
std::vector<int64_t> dims; std::vector<int64_t> dims;
dims.reserve(array.ndim()); dims.reserve(array.ndim());
for (size_t i = 0; i < array.ndim(); ++i) { for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) {
dims.push_back(static_cast<int>(array.shape()[i])); dims.push_back(static_cast<int>(array.shape()[i]));
} }
...@@ -240,7 +240,7 @@ void PyCUDAPinnedTensorSetFromArray( ...@@ -240,7 +240,7 @@ void PyCUDAPinnedTensorSetFromArray(
const paddle::platform::CUDAPinnedPlace &place) { const paddle::platform::CUDAPinnedPlace &place) {
std::vector<int64_t> dims; std::vector<int64_t> dims;
dims.reserve(array.ndim()); dims.reserve(array.ndim());
for (size_t i = 0; i < array.ndim(); ++i) { for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) {
dims.push_back(static_cast<int>(array.shape()[i])); dims.push_back(static_cast<int>(array.shape()[i]));
} }
...@@ -260,7 +260,7 @@ inline void PyCUDAPinnedTensorSetFromArray( ...@@ -260,7 +260,7 @@ inline void PyCUDAPinnedTensorSetFromArray(
const paddle::platform::CUDAPinnedPlace &place) { const paddle::platform::CUDAPinnedPlace &place) {
std::vector<int64_t> dims; std::vector<int64_t> dims;
dims.reserve(array.ndim()); dims.reserve(array.ndim());
for (size_t i = 0; i < array.ndim(); ++i) { for (decltype(array.ndim()) i = 0; i < array.ndim(); ++i) {
dims.push_back(static_cast<int>(array.shape()[i])); dims.push_back(static_cast<int>(array.shape()[i]));
} }
......
...@@ -126,9 +126,9 @@ def __bootstrap__(): ...@@ -126,9 +126,9 @@ def __bootstrap__():
'check_nan_inf', 'benchmark', 'eager_delete_scope', 'use_mkldnn', 'check_nan_inf', 'benchmark', 'eager_delete_scope', 'use_mkldnn',
'use_ngraph', 'initial_cpu_memory_in_mb', 'init_allocated_mem', 'use_ngraph', 'initial_cpu_memory_in_mb', 'init_allocated_mem',
'free_idle_memory', 'paddle_num_threads', "dist_threadpool_size", 'free_idle_memory', 'paddle_num_threads', "dist_threadpool_size",
'eager_delete_tensor_gb', 'allocator_strategy', 'eager_delete_tensor_gb', 'fast_eager_deletion_mode',
'reader_queue_speed_test_mode', 'print_sub_graph_dir', 'allocator_strategy', 'reader_queue_speed_test_mode',
'pe_profile_fname' 'print_sub_graph_dir', 'pe_profile_fname'
] ]
if 'Darwin' not in sysstr: if 'Darwin' not in sysstr:
read_env_flags.append('use_pinned_memory') read_env_flags.append('use_pinned_memory')
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
os.environ['FLAGS_eager_delete_tensor_gb'] = '0.0'
os.environ['CPU_NUM'] = '2'
import six
import unittest
import paddle
import paddle.fluid.core as core
import paddle.fluid as fluid
def train(network, use_cuda, use_parallel_executor, batch_size=32, pass_num=2):
if use_cuda and not core.is_compiled_with_cuda():
print('Skip use_cuda=True because Paddle is not compiled with cuda')
return
word_dict = paddle.dataset.imdb.word_dict()
train_reader = paddle.batch(
paddle.dataset.imdb.train(word_dict), batch_size=batch_size)
data = fluid.layers.data(
name="words", shape=[1], dtype="int64", lod_level=1)
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
cost = network(data, label, len(word_dict))
optimizer = fluid.optimizer.Adagrad(learning_rate=0.2)
optimizer.minimize(cost)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=[data, label], place=place)
reader = feeder.decorate_reader(
train_reader, multi_devices=use_parallel_executor)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
if use_parallel_executor:
train_exe = fluid.ParallelExecutor(
use_cuda=use_cuda, loss_name=cost.name)
fetch_list = [cost.name]
else:
train_exe = exe
fetch_list = [cost]
for pass_id in six.moves.xrange(pass_num):
batch_id = 0
for data in reader():
train_exe.run(feed=data,
fetch_list=fetch_list if batch_id % 4 == 0 else [])
batch_id += 1
if batch_id > 16:
break
class TestBase(unittest.TestCase):
def setUp(self):
self.net = None
def test_network(self):
if self.net is None:
return
for use_cuda in [True, False]:
for use_parallel_executor in [False, True]:
print('network: {}, use_cuda: {}, use_parallel_executor: {}'.
format(self.net.__name__, use_cuda,
use_parallel_executor))
with fluid.program_guard(fluid.Program(), fluid.Program()):
with fluid.scope_guard(core.Scope()):
train(self.net, use_cuda, use_parallel_executor)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test_eager_deletion_dynamic_rnn_base import TestBase
import paddle.fluid as fluid
def gru_net(data,
label,
dict_dim,
emb_dim=128,
hid_dim=128,
hid_dim2=96,
class_dim=2,
emb_lr=400.0):
emb = fluid.layers.embedding(
input=data,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(learning_rate=emb_lr))
fc0 = fluid.layers.fc(input=emb, size=hid_dim * 3)
gru_h = fluid.layers.dynamic_gru(input=fc0, size=hid_dim, is_reverse=False)
gru_max = fluid.layers.sequence_pool(input=gru_h, pool_type='max')
gru_max_tanh = fluid.layers.tanh(gru_max)
fc1 = fluid.layers.fc(input=gru_max_tanh, size=hid_dim2, act='tanh')
prediction = fluid.layers.fc(input=fc1, size=class_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=label)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
class GRUTest(TestBase):
def setUp(self):
self.net = gru_net
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from test_eager_deletion_dynamic_rnn_base import TestBase
import paddle.fluid as fluid
import unittest
def lstm_net(data,
label,
dict_dim,
emb_dim=128,
hid_dim=128,
hid_dim2=96,
class_dim=2,
emb_lr=30.0):
emb = fluid.layers.embedding(
input=data,
size=[dict_dim, emb_dim],
param_attr=fluid.ParamAttr(learning_rate=emb_lr))
fc0 = fluid.layers.fc(input=emb, size=hid_dim * 4)
lstm_h, c = fluid.layers.dynamic_lstm(
input=fc0, size=hid_dim * 4, is_reverse=False)
lstm_max = fluid.layers.sequence_pool(input=lstm_h, pool_type='max')
lstm_max_tanh = fluid.layers.tanh(lstm_max)
fc1 = fluid.layers.fc(input=lstm_max_tanh, size=hid_dim2, act='tanh')
prediction = fluid.layers.fc(input=fc1, size=class_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=label)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
class LSTMTest(TestBase):
def setUp(self):
self.net = lstm_net
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0"
from test_parallel_executor_mnist import TestMNIST
class EagerDeletionTestMNIST(TestMNIST):
pass
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0"
from test_parallel_executor_transformer import TestTransformer
class EagerDeletionTestTransformer(TestTransformer):
pass
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册