diff --git a/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc b/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc index ba81ee9166fd655cf1c6b2b0bf14486d5c274143..4ba83e6a30cc2b40fcbf3b57d5651a4e1d49a397 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_event_garbage_collector.cc @@ -25,7 +25,8 @@ namespace paddle { namespace framework { InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector() { - WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, + WorkQueueOptions options(/*name*/ "GarbageCollector", /*num_threads*/ 1, + /*allow_spinning*/ true, /*track_task*/ false); queue_ = CreateSingleThreadedWorkQueue(options); } diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 5f403613c6b3078f60e3569e35b8a30da628f9a7..81c05df62ec41970bcfbefe5e001527e777051ed 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -58,13 +58,15 @@ class AsyncWorkQueue { : host_num_thread_(host_num_threads) { std::vector group_options; // for execute host Kernel - group_options.emplace_back(/*num_threads*/ host_num_threads, + group_options.emplace_back(/*name*/ "HostTasks", + /*num_threads*/ host_num_threads, /*allow_spinning*/ true, /*track_task*/ false, /*detached*/ true, /*events_waiter*/ waiter); // for launch device Kernel - group_options.emplace_back(/*num_threads*/ 1, + group_options.emplace_back(/*name*/ "DeviceKernelLaunch", + /*num_threads*/ 1, /*allow_spinning*/ true, /*track_task*/ false, /*detached*/ true, diff --git a/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h b/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h index 37044d3c19b35bfc8712184666f1124b0787c6bf..2ad76562c15dd8112f77bdf4c3ebf9e709e60056 100644 --- a/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h +++ b/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h @@ -12,10 +12,12 @@ #include #include #include +#include "glog/logging.h" #include "paddle/fluid/framework/new_executor/workqueue/event_count.h" #include "paddle/fluid/framework/new_executor/workqueue/run_queue.h" #include "paddle/fluid/framework/new_executor/workqueue/thread_environment.h" #include "paddle/fluid/platform/os_info.h" +#include "paddle/fluid/platform/profiler/event_tracing.h" namespace paddle { namespace framework { @@ -26,7 +28,7 @@ class ThreadPoolTempl { typedef typename Environment::Task Task; typedef RunQueue Queue; - ThreadPoolTempl(int num_threads, bool allow_spinning, + ThreadPoolTempl(const std::string& name, int num_threads, bool allow_spinning, Environment env = Environment()) : env_(env), allow_spinning_(allow_spinning), @@ -38,7 +40,8 @@ class ThreadPoolTempl { cancelled_(false), ec_(num_threads), num_threads_(num_threads), - thread_data_(num_threads) { + thread_data_(num_threads), + name_(name) { // Calculate coprimes of all numbers [1, num_threads]. // Coprimes are used for random walks over all threads in Steal // and NonEmptyQueueIndex. Iteration is based on the fact that if we take @@ -240,9 +243,13 @@ class ThreadPoolTempl { EventCount ec_; const int num_threads_; std::vector thread_data_; + std::string name_; // Main worker thread loop. void WorkerLoop(int thread_id) { + std::string thr_name = name_ + "_thread_" + std::to_string(thread_id); + VLOG(1) << thr_name << " started "; + platform::SetCurrentThreadName(thr_name); PerThread* pt = GetPerThread(); pt->pool = this; pt->rand = GlobalThreadIdHash(); @@ -401,6 +408,7 @@ class ThreadPoolTempl { ec_.Notify(true); return false; } + platform::RecordEvent("SleepWaitForWork"); ec_.CommitWait(waiter); blocked_--; return true; diff --git a/paddle/fluid/framework/new_executor/workqueue/workqueue.cc b/paddle/fluid/framework/new_executor/workqueue/workqueue.cc index 45694349168a4e675f39f1a3693b3422a47580c7..07c5298c2f22377e277939e11af6fa6c142f24bc 100644 --- a/paddle/fluid/framework/new_executor/workqueue/workqueue.cc +++ b/paddle/fluid/framework/new_executor/workqueue/workqueue.cc @@ -11,6 +11,17 @@ namespace paddle { namespace framework { + +void WorkQueueOptions::Validate() const { + PADDLE_ENFORCE_GT(name.size(), 0, + platform::errors::InvalidArgument( + "WorkQueueOptions.name must be nonempty")); + PADDLE_ENFORCE_EQ( + name.find('_'), std::string::npos, + platform::errors::InvalidArgument( + "WorkQueueOptions.name shouldn't contain an underline")); +} + namespace { using TaskTracker = TaskTracker; @@ -30,7 +41,7 @@ class WorkQueueImpl : public WorkQueue { destruct_notifier_ = options.events_waiter->RegisterEvent(kQueueDestructEvent); } - queue_ = new NonblockingThreadPool(options_.num_threads, + queue_ = new NonblockingThreadPool(options_.name, options_.num_threads, options_.allow_spinning); } @@ -121,8 +132,8 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( destruct_notifier_ = options.events_waiter->RegisterEvent(kQueueDestructEvent); } - queues_[idx] = new (&queues_storage_[idx]) - NonblockingThreadPool(options.num_threads, options.allow_spinning); + queues_[idx] = new (&queues_storage_[idx]) NonblockingThreadPool( + options.name, options.num_threads, options.allow_spinning); } } @@ -182,6 +193,8 @@ void WorkQueueGroupImpl::Cancel() { std::unique_ptr CreateSingleThreadedWorkQueue( const WorkQueueOptions& options) { + options.Validate(); + // extra check PADDLE_ENFORCE_EQ(options.num_threads, 1u, platform::errors::InvalidArgument( "For a SingleThreadedWorkQueue, " @@ -192,6 +205,8 @@ std::unique_ptr CreateSingleThreadedWorkQueue( std::unique_ptr CreateMultiThreadedWorkQueue( const WorkQueueOptions& options) { + options.Validate(); + // extra check PADDLE_ENFORCE_GT( options.num_threads, 1u, platform::errors::InvalidArgument("For a MultiThreadedWorkQueue, " @@ -207,6 +222,9 @@ std::unique_ptr CreateWorkQueueGroup( platform::errors::InvalidArgument( "For a WorkQueueGroup, the number of WorkQueueOptions " "must be greater than 1.")); + for (const auto& opts : queues_options) { + opts.Validate(); + } std::unique_ptr ptr(new WorkQueueGroupImpl(queues_options)); return ptr; } diff --git a/paddle/fluid/framework/new_executor/workqueue/workqueue.h b/paddle/fluid/framework/new_executor/workqueue/workqueue.h index 068c54a21a4526ed02374fd6eb110f944e186159..6c8abee2f01dcf15920adc9c130828577912cca8 100644 --- a/paddle/fluid/framework/new_executor/workqueue/workqueue.h +++ b/paddle/fluid/framework/new_executor/workqueue/workqueue.h @@ -16,6 +16,7 @@ #include #include +#include #include namespace paddle { @@ -27,19 +28,31 @@ constexpr const char* kQueueDestructEvent = "QueueDestruct"; class EventsWaiter; struct WorkQueueOptions { - WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task) - : num_threads(num_threads), + WorkQueueOptions(const std::string& name, size_t num_threads, + bool allow_spinning, bool track_task) + : name(name), + num_threads(num_threads), allow_spinning(allow_spinning), - track_task(track_task) {} - - WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task, - bool detached, EventsWaiter* waiter) - : num_threads(num_threads), + track_task(track_task) { + Validate(); + } + + WorkQueueOptions(const std::string& name, size_t num_threads, + bool allow_spinning, bool track_task, bool detached, + EventsWaiter* waiter) + : name(name), + num_threads(num_threads), allow_spinning(allow_spinning), track_task(track_task), detached(detached), - events_waiter(waiter) {} + events_waiter(waiter) { + Validate(); + } + + // throw an exception if there is an invalid option + void Validate() const; + std::string name; size_t num_threads; bool allow_spinning; // If you need to blocking the calling thread to wait "queue empty", set diff --git a/paddle/fluid/framework/new_executor/workqueue/workqueue_test.cc b/paddle/fluid/framework/new_executor/workqueue/workqueue_test.cc index e06beb623be4c1bb390135946a51716592a299f2..25448da8f10f9c0cb290c2cd0cd209a415f73fa9 100644 --- a/paddle/fluid/framework/new_executor/workqueue/workqueue_test.cc +++ b/paddle/fluid/framework/new_executor/workqueue/workqueue_test.cc @@ -44,7 +44,8 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) { constexpr unsigned kLoopNum = 1000000; // CreateSingleThreadedWorkQueue EventsWaiter events_waiter; - WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, + WorkQueueOptions options(/*name*/ "SingleThreadedWorkQueueForTesting", + /*num_threads*/ 1, /*allow_spinning*/ true, /*track_task*/ true, /*detached*/ true, &events_waiter); auto work_queue = CreateSingleThreadedWorkQueue(options); @@ -78,7 +79,8 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) { constexpr unsigned kLoopNum = 1000000; // CreateMultiThreadedWorkQueue EventsWaiter events_waiter; - WorkQueueOptions options(/*num_threads*/ 10, /*allow_spinning*/ true, + WorkQueueOptions options(/*name*/ "MultiThreadedWorkQueueForTesting", + /*num_threads*/ 10, /*allow_spinning*/ true, /*track_task*/ true, /*detached*/ false, &events_waiter); auto work_queue = CreateMultiThreadedWorkQueue(options); @@ -117,10 +119,12 @@ TEST(WorkQueue, TestWorkQueueGroup) { constexpr unsigned kLoopNum = 1000000; // ThreadedWorkQueueGroup EventsWaiter events_waiter; - WorkQueueOptions sq_options(/*num_threads*/ 1, /*allow_spinning*/ true, + WorkQueueOptions sq_options(/*name*/ "SingleThreadedWorkQueueForTesting", + /*num_threads*/ 1, /*allow_spinning*/ true, /*track_task*/ true, /*detached*/ false, &events_waiter); - WorkQueueOptions mq_options(/*num_threads*/ 10, /*allow_spinning*/ true, + WorkQueueOptions mq_options(/*name*/ "MultiThreadedWorkQueueForTesting", + /*num_threads*/ 10, /*allow_spinning*/ true, /*track_task*/ true, /*detached*/ false, &events_waiter); auto queue_group = CreateWorkQueueGroup({sq_options, mq_options});