diff --git a/paddle/fluid/framework/new_executor/CMakeLists.txt b/paddle/fluid/framework/new_executor/CMakeLists.txt index 1d987e36e09cd40f729bbedf1d98a9c8ae18f6b6..e4a67f191b023c11efb0a58cb23a2104669d4667 100644 --- a/paddle/fluid/framework/new_executor/CMakeLists.txt +++ b/paddle/fluid/framework/new_executor/CMakeLists.txt @@ -3,9 +3,9 @@ lod_rank_table fs shell fleet_wrapper heter_wrapper ps_gpu_wrapper box_wrapper l graph_to_program_pass variable_helper timer monitor) cc_library(workqueue SRCS workqueue.cc DEPS enforce) +cc_library(interpretercore_garbage_collector SRCS interpretercore_garbage_collector.cc DEPS workqueue ${DEVICE_EVENT_LIBS}) cc_library(interpretercore_util SRCS interpretercore_util.cc DEPS ${INTERPRETERCORE_DEPS}) -cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util) - +cc_library(interpretercore SRCS interpretercore.cc DEPS workqueue ${DEVICE_EVENT_LIBS} interpretercore_util interpretercore_garbage_collector) cc_library(standalone_executor SRCS standalone_executor.cc DEPS interpretercore) cc_test(workqueue_test SRCS workqueue_test.cc DEPS workqueue) # cc_binary(standalone_executor_test SRCS standalone_executor_test.cc DEPS interpretercore standalone_executor operator op_registry executor ${GLOB_OP_LIB} ${GLOB_OPERATOR_DEPS} profiler) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index f87c631ef6ca210506a5b404a907ecd8b0882bb2..864a0a45366f3693ced6616f61fc6cf0ec6c4758 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -12,15 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#define GLOG_NO_ABBREVIATED_SEVERITIES // msvc conflict logging with windows.h - -#if !defined(_WIN32) -#include -#else -#define NOMINMAX -#include -#endif // !_WIN32 - #include "paddle/fluid/framework/new_executor/interpretercore.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h" @@ -136,13 +127,6 @@ InterpreterCore::InterpreterCore(const platform::Place& place, h2d_ctx_pool_({place}) { is_build_ = false; - garbages_.reset(new GarbageQueue()); - max_memory_size_ = static_cast(GetEagerDeletionThreshold()); - cur_memory_size_ = 0; - WorkQueueOptions options; - options.num_threads = 1; - gc_queue_ = CreateSingleThreadedWorkQueue(options); - feed_names_ = feed_names; // Step1: add feedop and fetchop to main_program @@ -268,8 +252,6 @@ void InterpreterCore::Convert() { } for (size_t i = 0; i < vec_instruction_.size(); ++i) { - gc_event_.emplace_back(place_, platform::GenerateDeviceEventFlag()); - std::vector vec_temp; for (auto& item : vec_instruction_[i].output_index_) { for (auto id : item.second) { @@ -301,6 +283,11 @@ void InterpreterCore::Convert() { for (size_t i = 0; i < vec_instruction_.size(); ++i) { BuildAndCacheInstructionCtx(&vec_instruction_[i], *global_scope_, place_); } + + for (size_t i = 0; i < vec_instruction_.size(); ++i) { + gc_event_.emplace_back(vec_instruction_[i].execution_ctx_.get()->GetPlace(), + platform::GenerateDeviceEventFlag()); + } } void InterpreterCore::BuildAndCacheInstructionCtx( @@ -412,72 +399,8 @@ void InterpreterCore::CheckGC(size_t instr_id, if (var_scope.vec_meta_info_[var_id].vardesc_ && !var_scope.vec_meta_info_[var_id].vardesc_->Persistable() && working_var_ref[var_id].var_ref_count_ == 0) { - Variable* var = var_scope.var_list[var_id]; - if (var->IsType()) { - garbages_->emplace_back( - var->GetMutable()->MoveMemoryHolder()); - if (garbages_->back()) { - cur_memory_size_ += garbages_->back()->size(); - } - } else if (var->IsType()) { - garbages_->emplace_back(var->GetMutable() - ->mutable_value() - ->MoveMemoryHolder()); - if (garbages_->back()) { - cur_memory_size_ += garbages_->back()->size(); - } - } else if (var->IsType()) { - auto* tensor_arr = var->GetMutable(); - for (auto& t : *tensor_arr) { - garbages_->emplace_back(t.MoveMemoryHolder()); - if (garbages_->back()) { - cur_memory_size_ += garbages_->back()->size(); - } - } - } else { - PADDLE_THROW(platform::errors::Unimplemented( - "The variable(%s) is not supported in eager deletion.", - framework::ToTypeName(var->Type()))); - } - } - } - - if (!garbages_->empty()) { - if (max_memory_size_ <= 1) { - gc_event_[instr_id].Record( - platform::DeviceContextPool::Instance().Get(place)); - gc_event_[instr_id].SetFininshed(); // Only for CPU Event - gc_queue_->AddTask( - [ container = garbages_.release(), event = &gc_event_[instr_id] ]() { - while (!event->Query()) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif - continue; - } - delete container; - }); - garbages_.reset(new GarbageQueue()); - } else if (cur_memory_size_ >= max_memory_size_) { - gc_event_[instr_id].Record( - platform::DeviceContextPool::Instance().Get(place)); - gc_event_[instr_id].SetFininshed(); // Only for CPU Event - gc_queue_->AddTask( - [ container = garbages_.release(), event = &gc_event_[instr_id] ]() { - while (!event->Query()) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif - continue; - } - delete container; - }); - garbages_.reset(new GarbageQueue()); - cur_memory_size_ = 0; + gc_.Add(var_scope.var_list[var_id], gc_event_[instr_id], + vec_instruction_[instr_id].dev_ctx_); } } } diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 51cbf225cf777d5f3d30cbd66fdd9f4b91a5ed66..fef2c47bac2e8ded30f7cfa4244dcc2dda321896 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -19,6 +19,8 @@ #include #include +#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h" +#include "paddle/fluid/framework/new_executor/interpretercore_util.h" #include "paddle/fluid/framework/new_executor/new_executor_defs.h" #include "paddle/fluid/framework/new_executor/profiler.h" #include "paddle/fluid/framework/new_executor/workqueue.h" @@ -32,7 +34,6 @@ namespace framework { class InterpreterCore { public: - using GarbageQueue = std::deque>; InterpreterCore(const platform::Place& place, const ProgramDesc& main_prog, VariableScope* global_scope, const std::vector& feed_names, @@ -101,11 +102,8 @@ class InterpreterCore { std::map> var_id2event_; + InterpreterCoreGarbageCollector gc_; std::vector gc_event_; - std::unique_ptr garbages_; - size_t max_memory_size_; - size_t cur_memory_size_; - std::unique_ptr gc_queue_; }; } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc new file mode 100644 index 0000000000000000000000000000000000000000..0f90e37c7b706cda4b3830575b04883efd4510d5 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.cc @@ -0,0 +1,112 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h" +#include "paddle/fluid/framework/garbage_collector.h" + +namespace paddle { +namespace framework { + +InterpreterCoreGarbageCollector::InterpreterCoreGarbageCollector() { + garbages_.reset(new GarbageQueue()); + max_memory_size_ = static_cast(GetEagerDeletionThreshold()); + cur_memory_size_ = 0; + + WorkQueueOptions options; + options.num_threads = 1; + queue_ = CreateSingleThreadedWorkQueue(options); +} + +void InterpreterCoreGarbageCollector::Add( + std::shared_ptr garbage, + paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) { + if (max_memory_size_ <= 1) { + Free(garbage, event, ctx); + } else { + if (!garbage) return; + GarbageQueue* garbage_ptr = nullptr; + { + std::lock_guard guard(spinlock_); + cur_memory_size_ += garbage->size(); + garbages_->push_back(std::move(garbage)); + + if (cur_memory_size_ >= max_memory_size_) { + cur_memory_size_ = 0; + garbage_ptr = garbages_.release(); + garbages_.reset(new GarbageQueue()); + } + } + if (garbage_ptr) { + Free(garbage_ptr, event, ctx); + } + } +} + +void InterpreterCoreGarbageCollector::Add(paddle::framework::Variable* var, + paddle::platform::DeviceEvent& event, + const platform::DeviceContext* ctx) { + if (var->IsType()) { + Add(var->GetMutable()->MoveMemoryHolder(), event, ctx); + } else if (var->IsType()) { + Add(var->GetMutable()->mutable_value()->MoveMemoryHolder(), + event, ctx); + } else if (var->IsType()) { + auto* tensor_arr = var->GetMutable(); + for (auto& t : *tensor_arr) { + Add(t.MoveMemoryHolder(), event, ctx); + } + } else { + PADDLE_THROW(platform::errors::Unimplemented( + "The variable(%s) is not supported in eager deletion.", + framework::ToTypeName(var->Type()))); + } +} + +void InterpreterCoreGarbageCollector::Free(GarbageQueue* garbages, + paddle::platform::DeviceEvent& event, + const platform::DeviceContext* ctx) { + event.Record(ctx); + event.SetFininshed(); // Only for CPU Event + queue_->AddTask([ container = garbages, event = &event ]() { + while (!event->Query()) { +#if defined(_WIN32) + SleepEx(50, FALSE); +#else + sched_yield(); +#endif + continue; + } + delete container; + }); +} + +void InterpreterCoreGarbageCollector::Free( + std::shared_ptr& garbage, + paddle::platform::DeviceEvent& event, const platform::DeviceContext* ctx) { + event.Record(ctx); + event.SetFininshed(); // Only for CPU Event + queue_->AddTask([ container = garbage, event = &event ]() { + while (!event->Query()) { +#if defined(_WIN32) + SleepEx(50, FALSE); +#else + sched_yield(); +#endif + continue; + } + }); +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h new file mode 100644 index 0000000000000000000000000000000000000000..b1157c861754ce913b66683e1ad52a08e7550a57 --- /dev/null +++ b/paddle/fluid/framework/new_executor/interpretercore_garbage_collector.h @@ -0,0 +1,64 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#if !defined(_WIN32) +#include +#else +#define NOMINMAX +#include +#endif // !_WIN32 + +#include +#include + +#include "paddle/fluid/framework/new_executor/workqueue.h" +#include "paddle/fluid/memory/allocation/spin_lock.h" +#include "paddle/fluid/platform/device_event.h" + +namespace paddle { +namespace framework { + +using GarbageQueue = std::deque>; +class InterpreterCoreGarbageCollector { + public: + InterpreterCoreGarbageCollector(); + + void Add(std::shared_ptr garbage, // NOLINT + paddle::platform::DeviceEvent& event, // NOLINT + const platform::DeviceContext* ctx); + + void Add(paddle::framework::Variable* var, + paddle::platform::DeviceEvent& event, // NOLINT + const platform::DeviceContext* ctx); + + DISABLE_COPY_AND_ASSIGN(InterpreterCoreGarbageCollector); + + private: + void Free(GarbageQueue* garbages, + paddle::platform::DeviceEvent& event, // NOLINT + const platform::DeviceContext* ctx); + + void Free(std::shared_ptr& garbage, // NOLINT + paddle::platform::DeviceEvent& event, // NOLINT + const platform::DeviceContext* ctx); + + std::unique_ptr garbages_; + size_t max_memory_size_; + size_t cur_memory_size_; + std::unique_ptr queue_; + paddle::memory::SpinLock spinlock_; +}; +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 2f6ad33d9297f2ec177786cef979966881a398dc..db7f35fb7ce862cd3f842c948647bbefb0888c1a 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -241,33 +241,37 @@ class RuntimeInferShapeContext : public InferShapeContext { void ShareLoD(const std::string& in, const std::string& out, size_t i = 0, size_t j = 0) const override { auto in_it = ctx_.inputs.find(in); - auto out_it = ctx_.outputs.find(out); PADDLE_ENFORCE_NE( in_it, ctx_.inputs.end(), platform::errors::NotFound("Input %s does not exist.", in)); - PADDLE_ENFORCE_NE( - out_it, ctx_.outputs.end(), - platform::errors::NotFound("Output %s does not exist.", out)); PADDLE_ENFORCE_LT(i, in_it->second.size(), platform::errors::InvalidArgument( "The index of input dimension is out of range, " "excepted index less than %zu, but received %zu.", in_it->second.size(), i)); + + Variable* in_var = in_it->second.at(i); + if (!in_var->IsType()) return; + + auto out_it = ctx_.outputs.find(out); + PADDLE_ENFORCE_NE( + out_it, ctx_.outputs.end(), + platform::errors::NotFound("Output %s does not exist.", out)); PADDLE_ENFORCE_LT(j, out_it->second.size(), platform::errors::InvalidArgument( "The index of output dimension is out of range, " "excepted index less than %zu, but received %zu.", out_it->second.size(), j)); - Variable* in_var = in_it->second.at(i); - if (!in_var->IsType()) return; Variable* out_var = out_it->second.at(j); PADDLE_ENFORCE_EQ( out_var->IsType(), true, platform::errors::InvalidArgument( "The %zu-th output of Output(%s) must be LoDTensor.", j, out)); + auto& in_tensor = in_var->Get(); auto* out_tensor = out_var->GetMutable(); + out_tensor->set_lod(in_tensor.lod()); // TODO(dzhwinter) : reuse ShareLoD in most operators. diff --git a/paddle/fluid/memory/allocation/spin_lock.h b/paddle/fluid/memory/allocation/spin_lock.h index bc46fa2b37040ca76e6cf04546af8ef55cb61568..42462fd74b4cd7d47e59cf67ff0c9467ee66f7b9 100644 --- a/paddle/fluid/memory/allocation/spin_lock.h +++ b/paddle/fluid/memory/allocation/spin_lock.h @@ -21,6 +21,8 @@ #include #endif // !_WIN32 +#include "paddle/fluid/platform/macros.h" + namespace paddle { namespace memory {