未验证 提交 adaa207b 编写于 作者: W wanghuancoder 提交者: GitHub

refactor gc (#35525)

* refactor gc, test=develop

* refine, test=develop

* refine, test=develop

* refine, test=develop

* gc each tensor, test=develop

* refine, test=develop
上级 8412d6c0
......@@ -3,9 +3,9 @@ lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper l
graph_to_program_pass variable_helper timer monitor)
cc_library(workqueue SRCS workqueue.cc DEPS enforce)
cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS})
cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS})
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util)
cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector)
cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore)
cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue)
# cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler)
......@@ -12,15 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#define GLOG_NO_ABBREVIATED_SEVERITIES // msvc conflict logging with windows.h
#if !defined(_WIN32)
#include <sched.h>
#else
#define NOMINMAX
#include <windows.h>
#endif // !_WIN32
#include "paddle/fluid/framework/new_executor/interpretercore.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
......@@ -136,13 +127,6 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
h2d_ctx_pool_({place}) {
is_build_ = false;
garbages_.reset(new GarbageQueue());
max_memory_size_ = static_cast<size_t>(GetEagerDeletionThreshold());
cur_memory_size_ = 0;
WorkQueueOptions options;
options.num_threads = 1;
gc_queue_ = CreateSingleThreadedWorkQueue(options);
feed_names_ = feed_names;
// Step1: add feedop and fetchop to main_program
......@@ -268,8 +252,6 @@ void InterpreterCore::Convert() {
}
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
gc_event_.emplace_back(place_, platform::GenerateDeviceEventFlag());
std::vector<size_t> vec_temp;
for (auto& item : vec_instruction_[i].output_index_) {
for (auto id : item.second) {
......@@ -301,6 +283,11 @@ void InterpreterCore::Convert() {
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
BuildAndCacheInstructionCtx(&vec_instruction_[i], *global_scope_, place_);
}
for (size_t i = 0; i < vec_instruction_.size(); ++i) {
gc_event_.emplace_back(vec_instruction_[i].execution_ctx_.get()->GetPlace(),
platform::GenerateDeviceEventFlag());
}
}
void InterpreterCore::BuildAndCacheInstructionCtx(
......@@ -412,72 +399,8 @@ void InterpreterCore::CheckGC(size_t instr_id,
if (var_scope.vec_meta_info_[var_id].vardesc_ &&
!var_scope.vec_meta_info_[var_id].vardesc_->Persistable() &&
working_var_ref[var_id].var_ref_count_ == 0) {
Variable* var = var_scope.var_list[var_id];
if (var->IsType<LoDTensor>()) {
garbages_->emplace_back(
var->GetMutable<LoDTensor>()->MoveMemoryHolder());
if (garbages_->back()) {
cur_memory_size_ += garbages_->back()->size();
}
} else if (var->IsType<SelectedRows>()) {
garbages_->emplace_back(var->GetMutable<SelectedRows>()
->mutable_value()
->MoveMemoryHolder());
if (garbages_->back()) {
cur_memory_size_ += garbages_->back()->size();
}
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *tensor_arr) {
garbages_->emplace_back(t.MoveMemoryHolder());
if (garbages_->back()) {
cur_memory_size_ += garbages_->back()->size();
}
}
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
}
if (!garbages_->empty()) {
if (max_memory_size_ <= 1) {
gc_event_[instr_id].Record(
platform::DeviceContextPool::Instance().Get(place));
gc_event_[instr_id].SetFininshed(); // Only for CPU Event
gc_queue_->AddTask(
[ container = garbages_.release(), event = &gc_event_[instr_id] ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
delete container;
});
garbages_.reset(new GarbageQueue());
} else if (cur_memory_size_ >= max_memory_size_) {
gc_event_[instr_id].Record(
platform::DeviceContextPool::Instance().Get(place));
gc_event_[instr_id].SetFininshed(); // Only for CPU Event
gc_queue_->AddTask(
[ container = garbages_.release(), event = &gc_event_[instr_id] ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
delete container;
});
garbages_.reset(new GarbageQueue());
cur_memory_size_ = 0;
gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id],
vec_instruction_[instr_id].dev_ctx_);
}
}
}
......
......@@ -19,6 +19,8 @@
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h"
#include "paddle/fluid/framework/new_executor/interpretercore_util.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/profiler.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
......@@ -32,7 +34,6 @@ namespace framework {
class InterpreterCore {
public:
using GarbageQueue = std::deque<std::shared_ptr<memory::Allocation>>;
InterpreterCore(const platform::Place& place, const ProgramDesc& main_prog,
VariableScope* global_scope,
const std::vector<std::string>& feed_names,
......@@ -101,11 +102,8 @@ class InterpreterCore {
std::map<size_t, std::shared_ptr<platform::DeviceEvent>> var_id2event_;
InterpreterCoreGarbageCollector gc_;
std::vector<paddle::platform::DeviceEvent> gc_event_;
std::unique_ptr<GarbageQueue> garbages_;
size_t max_memory_size_;
size_t cur_memory_size_;
std::unique_ptr<WorkQueue> gc_queue_;
};
} // namespace framework
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h"
#include "paddle/fluid/framework/garbage_collector.h"
namespace paddle {
namespace framework {
InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() {
garbages_.reset(new GarbageQueue());
max_memory_size_ = static_cast<size_t>(GetEagerDeletionThreshold());
cur_memory_size_ = 0;
WorkQueueOptions options;
options.num_threads = 1;
queue_ = CreateSingleThreadedWorkQueue(options);
}
void InterpreterCoreGarbageCollector::Add(
std::shared_ptr<memory::Allocation> garbage,
paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) {
if (max_memory_size_ <= 1) {
Free(garbage, event, ctx);
} else {
if (!garbage) return;
GarbageQueue* garbage_ptr = nullptr;
{
std::lock_guard<paddle::memory::SpinLock> guard(spinlock_);
cur_memory_size_ += garbage->size();
garbages_->push_back(std::move(garbage));
if (cur_memory_size_ >= max_memory_size_) {
cur_memory_size_ = 0;
garbage_ptr = garbages_.release();
garbages_.reset(new GarbageQueue());
}
}
if (garbage_ptr) {
Free(garbage_ptr, event, ctx);
}
}
}
void InterpreterCoreGarbageCollector::Add(paddle::framework::Variable* var,
paddle::platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
if (var->IsType<LoDTensor>()) {
Add(var->GetMutable<LoDTensor>()->MoveMemoryHolder(), event, ctx);
} else if (var->IsType<SelectedRows>()) {
Add(var->GetMutable<SelectedRows>()->mutable_value()->MoveMemoryHolder(),
event, ctx);
} else if (var->IsType<LoDTensorArray>()) {
auto* tensor_arr = var->GetMutable<LoDTensorArray>();
for (auto& t : *tensor_arr) {
Add(t.MoveMemoryHolder(), event, ctx);
}
} else {
PADDLE_THROW(platform::errors::Unimplemented(
"The variable(%s) is not supported in eager deletion.",
framework::ToTypeName(var->Type())));
}
}
void InterpreterCoreGarbageCollector::Free(GarbageQueue* garbages,
paddle::platform::DeviceEvent& event,
const platform::DeviceContext* ctx) {
event.Record(ctx);
event.SetFininshed(); // Only for CPU Event
queue_->AddTask([ container = garbages, event = &event ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
delete container;
});
}
void InterpreterCoreGarbageCollector::Free(
std::shared_ptr<memory::Allocation>& garbage,
paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) {
event.Record(ctx);
event.SetFininshed(); // Only for CPU Event
queue_->AddTask([ container = garbage, event = &event ]() {
while (!event->Query()) {
#if defined(_WIN32)
SleepEx(50, FALSE);
#else
sched_yield();
#endif
continue;
}
});
}
} // namespace framework
} // namespace paddle
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#if !defined(_WIN32)
#include <sched.h>
#else
#define NOMINMAX
#include <windows.h>
#endif // !_WIN32
#include <queue>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/memory/allocation/spin_lock.h"
#include "paddle/fluid/platform/device_event.h"
namespace paddle {
namespace framework {
using GarbageQueue = std::deque<std::shared_ptr<memory::Allocation>>;
class InterpreterCoreGarbageCollector {
public:
InterpreterCoreGarbageCollector();
void Add(std::shared_ptr<memory::Allocation> garbage, // NOLINT
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
void Add(paddle::framework::Variable* var,
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
DISABLE_COPY_AND_ASSIGN(InterpreterCoreGarbageCollector);
private:
void Free(GarbageQueue* garbages,
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
void Free(std::shared_ptr<memory::Allocation>& garbage, // NOLINT
paddle::platform::DeviceEvent& event, // NOLINT
const platform::DeviceContext* ctx);
std::unique_ptr<GarbageQueue> garbages_;
size_t max_memory_size_;
size_t cur_memory_size_;
std::unique_ptr<WorkQueue> queue_;
paddle::memory::SpinLock spinlock_;
};
} // namespace framework
} // namespace paddle
......@@ -241,33 +241,37 @@ class RuntimeInferShapeContext : public InferShapeContext {
void ShareLoD(const std::string& in, const std::string& out, size_t i = 0,
size_t j = 0) const override {
auto in_it = ctx_.inputs.find(in);
auto out_it = ctx_.outputs.find(out);
PADDLE_ENFORCE_NE(
in_it, ctx_.inputs.end(),
platform::errors::NotFound("Input %s does not exist.", in));
PADDLE_ENFORCE_NE(
out_it, ctx_.outputs.end(),
platform::errors::NotFound("Output %s does not exist.", out));
PADDLE_ENFORCE_LT(i, in_it->second.size(),
platform::errors::InvalidArgument(
"The index of input dimension is out of range, "
"excepted index less than %zu, but received %zu.",
in_it->second.size(), i));
Variable* in_var = in_it->second.at(i);
if (!in_var->IsType<LoDTensor>()) return;
auto out_it = ctx_.outputs.find(out);
PADDLE_ENFORCE_NE(
out_it, ctx_.outputs.end(),
platform::errors::NotFound("Output %s does not exist.", out));
PADDLE_ENFORCE_LT(j, out_it->second.size(),
platform::errors::InvalidArgument(
"The index of output dimension is out of range, "
"excepted index less than %zu, but received %zu.",
out_it->second.size(), j));
Variable* in_var = in_it->second.at(i);
if (!in_var->IsType<LoDTensor>()) return;
Variable* out_var = out_it->second.at(j);
PADDLE_ENFORCE_EQ(
out_var->IsType<LoDTensor>(), true,
platform::errors::InvalidArgument(
"The %zu-th output of Output(%s) must be LoDTensor.", j, out));
auto& in_tensor = in_var->Get<LoDTensor>();
auto* out_tensor = out_var->GetMutable<LoDTensor>();
out_tensor->set_lod(in_tensor.lod());
// TODO(dzhwinter) : reuse ShareLoD in most operators.
......
......@@ -21,6 +21,8 @@
#include <windows.h>
#endif // !_WIN32
#include "paddle/fluid/platform/macros.h"
namespace paddle {
namespace memory {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册