diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 63d6fcbf823e4fce0b1b260543aec38475a607d8..c379e135b16b68bdbceecb1859a9d88e980271fb 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -19,6 +19,7 @@ #include "paddle/fluid/framework/details/nan_inf_utils.h" #include "paddle/fluid/framework/details/share_tensor_buffer_functor.h" #include "paddle/fluid/framework/new_executor/interpretercore_util.h" +#include "paddle/fluid/framework/new_executor/threadpool_config.h" #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/platform/device/gpu/gpu_info.h" #include "paddle/fluid/platform/os_info.h" @@ -47,9 +48,6 @@ constexpr const char* kTaskCompletion = "TaskCompletion"; namespace paddle { namespace framework { -// NOTE(Aurelius84): Need a better strategy to determine it. -static constexpr size_t kHostNumThreads = 4; -static constexpr size_t kDeviceNumThreads = 1; InterpreterCore::InterpreterCore(const platform::Place& place, const BlockDesc& block, @@ -308,8 +306,14 @@ bool InterpreterCore::BuildInplaceCheckVarIsOnlyInput(size_t var_index) { std::shared_ptr InterpreterCore::GetWorkQueue() { if (async_work_queue_ == nullptr) { - async_work_queue_ = std::make_shared( - kHostNumThreads, kDeviceNumThreads, &main_thread_blocker_); + int host_num_threads = 1, deivce_num_threads = 1, prepare_num_threads = 1; + std::tie(host_num_threads, deivce_num_threads, prepare_num_threads) = + interpreter::GetThreadPoolConfig(place_, vec_instruction_.size()); + async_work_queue_ = + std::make_shared(host_num_threads, + deivce_num_threads, + prepare_num_threads, + &main_thread_blocker_); } return async_work_queue_; } @@ -788,14 +792,23 @@ void InterpreterCore::ExecuteInstructionList( platform::RecordEvent record_prepare( "PrepareAtomic", platform::TracerEventType::UserDefined, 1); - // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare - // those for the next step - auto atomic_deps = atomic_deps_.get(); - auto atomic_var_ref = atomic_var_ref_.get(); - - atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_); - atomic_var_ref_ = - async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo()); + + std::unique_ptr>> atomic_deps = nullptr; + std::unique_ptr>> atomic_var_ref = nullptr; + + if (async_work_queue_->QueueNumThreads(kPrepareWorkQueueIdx)) { + // NOTE(zhiqiu): get the prepared deps from std::future, and async prepare + // those for the next step + atomic_deps = atomic_deps_.get(); + atomic_var_ref = atomic_var_ref_.get(); + + atomic_deps_ = async_work_queue_->PrepareAtomicDeps(dependecy_count_); + atomic_var_ref_ = + async_work_queue_->PrepareAtomicVarRef(var_scope_.VecMetaInfo()); + } else { + atomic_deps = interpreter::PrepareAtomicDeps(dependecy_count_); + atomic_var_ref = interpreter::PrepareAtomicVarRef(var_scope_.VecMetaInfo()); + } record_prepare.End(); exception_holder_.Clear(); diff --git a/paddle/fluid/framework/new_executor/interpretercore.h b/paddle/fluid/framework/new_executor/interpretercore.h index 2d60b0231a5c08e6ae58f4fdafa779fb01421835..86f3768c54d40357eef2e0df49d488e1813ae6dc 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.h +++ b/paddle/fluid/framework/new_executor/interpretercore.h @@ -129,7 +129,7 @@ class InterpreterCore { std::vector vec_instruction_; // deconstruct before OpFuncNode - // last_live_ops_[i] contains the id of operatos that last access var[i] + // last_live_ops_[i] contains the id of operators that last access var[i] std::map> last_live_ops_; std::vector dependecy_count_; diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 6fbb14287ed1c8f8443285fcb7c8182a60595bd0..edd5f76987200c5605f612df0b941bea1d6941d1 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -42,10 +42,12 @@ namespace framework { namespace interpreter { using VariableIdMap = std::map>; -constexpr size_t kPrepareWorkQueueIdx = 2; const std::vector ConstructWorkQueueOptions( - size_t host_num_threads, size_t device_num_threads, EventsWaiter* waiter) { + size_t host_num_threads, + size_t device_num_threads, + size_t prepare_num_threads, + EventsWaiter* waiter) { std::vector group_options; // for execute host Kernel group_options.emplace_back(/*name*/ "HostTasks", @@ -65,7 +67,7 @@ const std::vector ConstructWorkQueueOptions( /*events_waiter*/ waiter); // for prepare deps and others group_options.emplace_back(/*name*/ "Prepare", - /*num_threads*/ 1, + /*num_threads*/ prepare_num_threads, /*allow_spinning*/ true, /*always_spinning*/ false, /*track_task*/ false, @@ -76,10 +78,11 @@ const std::vector ConstructWorkQueueOptions( AsyncWorkQueue::AsyncWorkQueue(size_t host_num_threads, size_t device_num_threads, + size_t prepare_num_threads, EventsWaiter* waiter) : host_num_thread_(host_num_threads) { - queue_group_ = CreateWorkQueueGroup( - ConstructWorkQueueOptions(host_num_threads, device_num_threads, waiter)); + queue_group_ = CreateWorkQueueGroup(ConstructWorkQueueOptions( + host_num_threads, device_num_threads, prepare_num_threads, waiter)); } void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 1860b19b1ca4208e7a28c16cf1c0ad98ef0230ed..8fc0dcb266e3daa96fc2c2fdcb10cd2ed90ddab3 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -39,6 +39,7 @@ #include "paddle/fluid/platform/init.h" using AtomicVectorSizeT = std::vector>; +constexpr size_t kPrepareWorkQueueIdx = 2; namespace paddle { namespace framework { @@ -48,6 +49,7 @@ class AsyncWorkQueue { public: AsyncWorkQueue(size_t host_num_threads, size_t deivce_num_threads, + size_t prepare_num_threads, EventsWaiter* waiter); std::future> PrepareAtomicDeps( @@ -61,6 +63,10 @@ class AsyncWorkQueue { void Cancel() { queue_group_->Cancel(); } + size_t QueueNumThreads(size_t idx) { + return queue_group_->QueueNumThreads(idx); + } + private: size_t host_num_thread_; std::unique_ptr queue_group_; diff --git a/paddle/fluid/framework/new_executor/threadpool_config.h b/paddle/fluid/framework/new_executor/threadpool_config.h new file mode 100644 index 0000000000000000000000000000000000000000..0270aa7d1e86e55cc795553e25a48412a46c251a --- /dev/null +++ b/paddle/fluid/framework/new_executor/threadpool_config.h @@ -0,0 +1,136 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include "paddle/fluid/platform/device/ipu/ipu_info.h" +#include "paddle/fluid/platform/device/npu/npu_info.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/phi/backends/device_manager.h" +#include "paddle/phi/backends/gpu/gpu_info.h" +#include "paddle/phi/backends/xpu/xpu_info.h" + +DECLARE_bool(new_executor_serial_run); + +namespace paddle { +namespace framework { +namespace interpreter { + +static constexpr size_t kHostNumThreads = 4; +static constexpr size_t kDeviceNumThreads = 1; +static constexpr size_t kNumGcThreads = 1; +static constexpr size_t kNumPrepareThreads = 0; + +static constexpr size_t kMinOpNumForAsyncPrepare = 1000; + +// By default, one interpretercore contains: +// 1-size thread pool for device kernel launch (or 0 for cpu execution), +// 1-size thread pool for host kernel launch (or more if the system contains +// enough processors). + +// And it may contain: +// 1-size thread pool for gc if it is can not use FastGC, +// 1-size thread pool for preparation if the program contains two many ops +// (1000+). + +// Note that the purpose of the config is to limit the total 'possible' +// threads introduced by interpretercore to avoid hurting performance. + +inline std::tuple GetThreadPoolConfig(const phi::Place place, + size_t op_num) { + int num_device_threads = kDeviceNumThreads, + num_host_threads = kHostNumThreads, + num_prepare_threads = kNumPrepareThreads; + + if (op_num > kMinOpNumForAsyncPrepare) { + num_prepare_threads = 1; + } + + int device_count = 0, processor_count = 0; + if (platform::is_cpu_place(place)) { + num_device_threads = 0; + num_host_threads = 4; + } else { + processor_count = std::thread::hardware_concurrency(); + if (processor_count) { + if (platform::is_gpu_place(place)) { +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + device_count = phi::backends::gpu::GetGPUDeviceCount(); +#endif + } + if (platform::is_xpu_place(place)) { +#if defined(PADDLE_WITH_XPU) + device_count = phi::backends::xpu::GetXPUDeviceCount(); +#endif + } + if (platform::is_npu_place(place)) { +#if defined(PADDLE_WITH_ASCEND_CL) + device_count = platform::GetNPUDeviceCount(); +#endif + } + if (platform::is_ipu_place(place)) { +#if defined(PADDLE_WITH_IPU) + device_count = platform::GetIPUDeviceCount(); +#endif + } + if (platform::is_custom_place(place)) { +#if defined(PADDLE_WITH_CUSTOM_DEVICE) + device_count = + phi::DeviceManager::GetDeviceCount(place.GetDeviceType()); +#endif + } + + // Tricky implementation. + // In multi-card training, each card may set env like + // CUDA_VISIBLE_DEVICE=0 In that case, device_count is set to 8. + if (device_count == 1) { + device_count = 8; // in many case, the accelerator has 8 cards. + } + + // We expect processor_count = 2 * (the possible total threads when doing + // multi-card training), to make sure that the system will not slow down + // because of too many threads. Here, 2 is experience value. Since each + // device has one interpretercore, the possible total threads when doing + // multi-card training = device_count * (the possible total threads in one + // interpretercore). + + if (device_count) { + auto num = processor_count / device_count / 2 - + (kNumGcThreads + kNumPrepareThreads + num_device_threads); + num_host_threads = + num > 0 ? (num > kHostNumThreads ? kHostNumThreads : num) : 1; + } + } + } + + // In serial run, only one 1-size thread pool is used + if (FLAGS_new_executor_serial_run) { + num_host_threads = 0; + num_device_threads = 1; + } + + VLOG(4) << "place:" << place << ", processor_count:" << processor_count + << ", device_count:" << device_count + << ", serial_run:" << FLAGS_new_executor_serial_run + << ", num_host_threads:" << num_host_threads + << ", num_device_threads:" << num_device_threads + << ", num_prepare_threads:" << num_prepare_threads; + return std::make_tuple( + num_host_threads, num_device_threads, num_prepare_threads); +} + +} // namespace interpreter +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/new_executor/workqueue/workqueue.cc b/paddle/fluid/framework/new_executor/workqueue/workqueue.cc index 0055a8f055ca293431451f5267b84be146281b28..ad4a51152c4728ec9ac0978c89c940026a21b557 100644 --- a/paddle/fluid/framework/new_executor/workqueue/workqueue.cc +++ b/paddle/fluid/framework/new_executor/workqueue/workqueue.cc @@ -121,8 +121,13 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( queues_.resize(num_queues); void* buffer = malloc(sizeof(NonblockingThreadPool) * num_queues); queues_storage_ = reinterpret_cast(buffer); + for (size_t idx = 0; idx < num_queues; ++idx) { const auto& options = queues_options_[idx]; + if (options.num_threads == 0) { + queues_[idx] = nullptr; + continue; + } if (options.track_task && tracker_ == nullptr && options.events_waiter != nullptr) { empty_notifier_ = options.events_waiter->RegisterEvent(kQueueEmptyEvent); @@ -144,7 +149,9 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( WorkQueueGroupImpl::~WorkQueueGroupImpl() { for (auto queue : queues_) { - queue->~NonblockingThreadPool(); + if (queue) { + queue->~NonblockingThreadPool(); + } } if (tracker_ != nullptr) { tracker_->~TaskTracker(); @@ -161,6 +168,10 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function fn) { platform::TracerEventType::UserDefined, 10 /*level*/); assert(queue_idx < queues_.size()); + PADDLE_ENFORCE_NOT_NULL( + queues_.at(queue_idx), + platform::errors::NotFound("Workqueue of index %d is not initialized.", + queue_idx)); if (queues_options_.at(queue_idx).track_task) { fn = [task = std::move(fn), raii = CounterGuard(tracker_)]() mutable { task(); }; @@ -170,6 +181,9 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function fn) { size_t WorkQueueGroupImpl::QueueNumThreads(size_t queue_idx) const { assert(queue_idx < queues_.size()); + if (!queues_.at(queue_idx)) { + return 0; + } return queues_.at(queue_idx)->NumThreads(); } @@ -183,10 +197,14 @@ size_t WorkQueueGroupImpl::QueueGroupNumThreads() const { void WorkQueueGroupImpl::Cancel() { for (auto queue : queues_) { - queue->Cancel(); + if (queue) { + queue->Cancel(); + } } for (auto queue : queues_) { - queue->WaitThreadsExit(); + if (queue) { + queue->WaitThreadsExit(); + } } }