未验证 提交 44af74b8 编写于 作者: L liutiexing 提交者: GitHub

Workqueue threadnames (#39177)

* add align for WorkQueue

* add spinlock

* merge develop

* merge

* Add EventsWaiter

* Revert "Add EventsWaiter"

This reverts commit e206173aa9be7401b83a53581627bfaf557c8fb2.

* Set thread name for WorkQueue
Co-authored-by: Nliutiexing <liutiexing@google.com>
上级 04a16189
...@@ -25,7 +25,8 @@ namespace paddle { ...@@ -25,7 +25,8 @@ namespace paddle {
namespace framework { namespace framework {
InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector() { InterpreterCoreEventGarbageCollector::InterpreterCoreEventGarbageCollector() {
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, WorkQueueOptions options(/*name*/ "GarbageCollector", /*num_threads*/ 1,
/*allow_spinning*/ true,
/*track_task*/ false); /*track_task*/ false);
queue_ = CreateSingleThreadedWorkQueue(options); queue_ = CreateSingleThreadedWorkQueue(options);
} }
......
...@@ -58,13 +58,15 @@ class AsyncWorkQueue { ...@@ -58,13 +58,15 @@ class AsyncWorkQueue {
: host_num_thread_(host_num_threads) { : host_num_thread_(host_num_threads) {
std::vector<WorkQueueOptions> group_options; std::vector<WorkQueueOptions> group_options;
// for execute host Kernel // for execute host Kernel
group_options.emplace_back(/*num_threads*/ host_num_threads, group_options.emplace_back(/*name*/ "HostTasks",
/*num_threads*/ host_num_threads,
/*allow_spinning*/ true, /*allow_spinning*/ true,
/*track_task*/ false, /*track_task*/ false,
/*detached*/ true, /*detached*/ true,
/*events_waiter*/ waiter); /*events_waiter*/ waiter);
// for launch device Kernel // for launch device Kernel
group_options.emplace_back(/*num_threads*/ 1, group_options.emplace_back(/*name*/ "DeviceKernelLaunch",
/*num_threads*/ 1,
/*allow_spinning*/ true, /*allow_spinning*/ true,
/*track_task*/ false, /*track_task*/ false,
/*detached*/ true, /*detached*/ true,
......
...@@ -12,10 +12,12 @@ ...@@ -12,10 +12,12 @@
#include <atomic> #include <atomic>
#include <cstdlib> #include <cstdlib>
#include <vector> #include <vector>
#include "glog/logging.h"
#include "paddle/fluid/framework/new_executor/workqueue/event_count.h" #include "paddle/fluid/framework/new_executor/workqueue/event_count.h"
#include "paddle/fluid/framework/new_executor/workqueue/run_queue.h" #include "paddle/fluid/framework/new_executor/workqueue/run_queue.h"
#include "paddle/fluid/framework/new_executor/workqueue/thread_environment.h" #include "paddle/fluid/framework/new_executor/workqueue/thread_environment.h"
#include "paddle/fluid/platform/os_info.h" #include "paddle/fluid/platform/os_info.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle { namespace paddle {
namespace framework { namespace framework {
...@@ -26,7 +28,7 @@ class ThreadPoolTempl { ...@@ -26,7 +28,7 @@ class ThreadPoolTempl {
typedef typename Environment::Task Task; typedef typename Environment::Task Task;
typedef RunQueue<Task, 1024> Queue; typedef RunQueue<Task, 1024> Queue;
ThreadPoolTempl(int num_threads, bool allow_spinning, ThreadPoolTempl(const std::string& name, int num_threads, bool allow_spinning,
Environment env = Environment()) Environment env = Environment())
: env_(env), : env_(env),
allow_spinning_(allow_spinning), allow_spinning_(allow_spinning),
...@@ -38,7 +40,8 @@ class ThreadPoolTempl { ...@@ -38,7 +40,8 @@ class ThreadPoolTempl {
cancelled_(false), cancelled_(false),
ec_(num_threads), ec_(num_threads),
num_threads_(num_threads), num_threads_(num_threads),
thread_data_(num_threads) { thread_data_(num_threads),
name_(name) {
// Calculate coprimes of all numbers [1, num_threads]. // Calculate coprimes of all numbers [1, num_threads].
// Coprimes are used for random walks over all threads in Steal // Coprimes are used for random walks over all threads in Steal
// and NonEmptyQueueIndex. Iteration is based on the fact that if we take // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
...@@ -240,9 +243,13 @@ class ThreadPoolTempl { ...@@ -240,9 +243,13 @@ class ThreadPoolTempl {
EventCount ec_; EventCount ec_;
const int num_threads_; const int num_threads_;
std::vector<ThreadData> thread_data_; std::vector<ThreadData> thread_data_;
std::string name_;
// Main worker thread loop. // Main worker thread loop.
void WorkerLoop(int thread_id) { void WorkerLoop(int thread_id) {
std::string thr_name = name_ + "_thread_" + std::to_string(thread_id);
VLOG(1) << thr_name << " started ";
platform::SetCurrentThreadName(thr_name);
PerThread* pt = GetPerThread(); PerThread* pt = GetPerThread();
pt->pool = this; pt->pool = this;
pt->rand = GlobalThreadIdHash(); pt->rand = GlobalThreadIdHash();
...@@ -401,6 +408,7 @@ class ThreadPoolTempl { ...@@ -401,6 +408,7 @@ class ThreadPoolTempl {
ec_.Notify(true); ec_.Notify(true);
return false; return false;
} }
platform::RecordEvent("SleepWaitForWork");
ec_.CommitWait(waiter); ec_.CommitWait(waiter);
blocked_--; blocked_--;
return true; return true;
......
...@@ -11,6 +11,17 @@ ...@@ -11,6 +11,17 @@
namespace paddle { namespace paddle {
namespace framework { namespace framework {
void WorkQueueOptions::Validate() const {
PADDLE_ENFORCE_GT(name.size(), 0,
platform::errors::InvalidArgument(
"WorkQueueOptions.name must be nonempty"));
PADDLE_ENFORCE_EQ(
name.find('_'), std::string::npos,
platform::errors::InvalidArgument(
"WorkQueueOptions.name shouldn't contain an underline"));
}
namespace { namespace {
using TaskTracker = TaskTracker<EventsWaiter::EventNotifier>; using TaskTracker = TaskTracker<EventsWaiter::EventNotifier>;
...@@ -30,7 +41,7 @@ class WorkQueueImpl : public WorkQueue { ...@@ -30,7 +41,7 @@ class WorkQueueImpl : public WorkQueue {
destruct_notifier_ = destruct_notifier_ =
options.events_waiter->RegisterEvent(kQueueDestructEvent); options.events_waiter->RegisterEvent(kQueueDestructEvent);
} }
queue_ = new NonblockingThreadPool(options_.num_threads, queue_ = new NonblockingThreadPool(options_.name, options_.num_threads,
options_.allow_spinning); options_.allow_spinning);
} }
...@@ -121,8 +132,8 @@ WorkQueueGroupImpl::WorkQueueGroupImpl( ...@@ -121,8 +132,8 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
destruct_notifier_ = destruct_notifier_ =
options.events_waiter->RegisterEvent(kQueueDestructEvent); options.events_waiter->RegisterEvent(kQueueDestructEvent);
} }
queues_[idx] = new (&queues_storage_[idx]) queues_[idx] = new (&queues_storage_[idx]) NonblockingThreadPool(
NonblockingThreadPool(options.num_threads, options.allow_spinning); options.name, options.num_threads, options.allow_spinning);
} }
} }
...@@ -182,6 +193,8 @@ void WorkQueueGroupImpl::Cancel() { ...@@ -182,6 +193,8 @@ void WorkQueueGroupImpl::Cancel() {
std::unique_ptr<WorkQueue> CreateSingleThreadedWorkQueue( std::unique_ptr<WorkQueue> CreateSingleThreadedWorkQueue(
const WorkQueueOptions& options) { const WorkQueueOptions& options) {
options.Validate();
// extra check
PADDLE_ENFORCE_EQ(options.num_threads, 1u, PADDLE_ENFORCE_EQ(options.num_threads, 1u,
platform::errors::InvalidArgument( platform::errors::InvalidArgument(
"For a SingleThreadedWorkQueue, " "For a SingleThreadedWorkQueue, "
...@@ -192,6 +205,8 @@ std::unique_ptr<WorkQueue> CreateSingleThreadedWorkQueue( ...@@ -192,6 +205,8 @@ std::unique_ptr<WorkQueue> CreateSingleThreadedWorkQueue(
std::unique_ptr<WorkQueue> CreateMultiThreadedWorkQueue( std::unique_ptr<WorkQueue> CreateMultiThreadedWorkQueue(
const WorkQueueOptions& options) { const WorkQueueOptions& options) {
options.Validate();
// extra check
PADDLE_ENFORCE_GT( PADDLE_ENFORCE_GT(
options.num_threads, 1u, options.num_threads, 1u,
platform::errors::InvalidArgument("For a MultiThreadedWorkQueue, " platform::errors::InvalidArgument("For a MultiThreadedWorkQueue, "
...@@ -207,6 +222,9 @@ std::unique_ptr<WorkQueueGroup> CreateWorkQueueGroup( ...@@ -207,6 +222,9 @@ std::unique_ptr<WorkQueueGroup> CreateWorkQueueGroup(
platform::errors::InvalidArgument( platform::errors::InvalidArgument(
"For a WorkQueueGroup, the number of WorkQueueOptions " "For a WorkQueueGroup, the number of WorkQueueOptions "
"must be greater than 1.")); "must be greater than 1."));
for (const auto& opts : queues_options) {
opts.Validate();
}
std::unique_ptr<WorkQueueGroup> ptr(new WorkQueueGroupImpl(queues_options)); std::unique_ptr<WorkQueueGroup> ptr(new WorkQueueGroupImpl(queues_options));
return ptr; return ptr;
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string>
#include <vector> #include <vector>
namespace paddle { namespace paddle {
...@@ -27,19 +28,31 @@ constexpr const char* kQueueDestructEvent = "QueueDestruct"; ...@@ -27,19 +28,31 @@ constexpr const char* kQueueDestructEvent = "QueueDestruct";
class EventsWaiter; class EventsWaiter;
struct WorkQueueOptions { struct WorkQueueOptions {
WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task) WorkQueueOptions(const std::string& name, size_t num_threads,
: num_threads(num_threads), bool allow_spinning, bool track_task)
: name(name),
num_threads(num_threads),
allow_spinning(allow_spinning), allow_spinning(allow_spinning),
track_task(track_task) {} track_task(track_task) {
Validate();
WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task, }
bool detached, EventsWaiter* waiter)
: num_threads(num_threads), WorkQueueOptions(const std::string& name, size_t num_threads,
bool allow_spinning, bool track_task, bool detached,
EventsWaiter* waiter)
: name(name),
num_threads(num_threads),
allow_spinning(allow_spinning), allow_spinning(allow_spinning),
track_task(track_task), track_task(track_task),
detached(detached), detached(detached),
events_waiter(waiter) {} events_waiter(waiter) {
Validate();
}
// throw an exception if there is an invalid option
void Validate() const;
std::string name;
size_t num_threads; size_t num_threads;
bool allow_spinning; bool allow_spinning;
// If you need to blocking the calling thread to wait "queue empty", set // If you need to blocking the calling thread to wait "queue empty", set
......
...@@ -44,7 +44,8 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) { ...@@ -44,7 +44,8 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) {
constexpr unsigned kLoopNum = 1000000; constexpr unsigned kLoopNum = 1000000;
// CreateSingleThreadedWorkQueue // CreateSingleThreadedWorkQueue
EventsWaiter events_waiter; EventsWaiter events_waiter;
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true, WorkQueueOptions options(/*name*/ "SingleThreadedWorkQueueForTesting",
/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true, /*detached*/ true, /*track_task*/ true, /*detached*/ true,
&events_waiter); &events_waiter);
auto work_queue = CreateSingleThreadedWorkQueue(options); auto work_queue = CreateSingleThreadedWorkQueue(options);
...@@ -78,7 +79,8 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) { ...@@ -78,7 +79,8 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) {
constexpr unsigned kLoopNum = 1000000; constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue // CreateMultiThreadedWorkQueue
EventsWaiter events_waiter; EventsWaiter events_waiter;
WorkQueueOptions options(/*num_threads*/ 10, /*allow_spinning*/ true, WorkQueueOptions options(/*name*/ "MultiThreadedWorkQueueForTesting",
/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true, /*detached*/ false, /*track_task*/ true, /*detached*/ false,
&events_waiter); &events_waiter);
auto work_queue = CreateMultiThreadedWorkQueue(options); auto work_queue = CreateMultiThreadedWorkQueue(options);
...@@ -117,10 +119,12 @@ TEST(WorkQueue, TestWorkQueueGroup) { ...@@ -117,10 +119,12 @@ TEST(WorkQueue, TestWorkQueueGroup) {
constexpr unsigned kLoopNum = 1000000; constexpr unsigned kLoopNum = 1000000;
// ThreadedWorkQueueGroup // ThreadedWorkQueueGroup
EventsWaiter events_waiter; EventsWaiter events_waiter;
WorkQueueOptions sq_options(/*num_threads*/ 1, /*allow_spinning*/ true, WorkQueueOptions sq_options(/*name*/ "SingleThreadedWorkQueueForTesting",
/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true, /*detached*/ false, /*track_task*/ true, /*detached*/ false,
&events_waiter); &events_waiter);
WorkQueueOptions mq_options(/*num_threads*/ 10, /*allow_spinning*/ true, WorkQueueOptions mq_options(/*name*/ "MultiThreadedWorkQueueForTesting",
/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true, /*detached*/ false, /*track_task*/ true, /*detached*/ false,
&events_waiter); &events_waiter);
auto queue_group = CreateWorkQueueGroup({sq_options, mq_options}); auto queue_group = CreateWorkQueueGroup({sq_options, mq_options});
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册