From feda7c1d678c87562190db0b790d0eae02d3b98e Mon Sep 17 00:00:00 2001 From: liutiexing <74819124+liutiexing@users.noreply.github.com> Date: Wed, 1 Dec 2021 14:21:18 +0800 Subject: [PATCH] HostEventRecorder (#37629) * add align for WorkQueue * add spinlock * merge develop * merge * Add EventsWaiter * Revert "Add EventsWaiter" This reverts commit e206173aa9be7401b83a53581627bfaf557c8fb2. * update HostEventTracer * update HostEventTracer * fix c++17 * update * update * update * update * fix bug Co-authored-by: liutiexing --- .../framework/new_executor/interpretercore.cc | 2 +- paddle/fluid/platform/profiler.cc | 381 +++++++++++++++++- paddle/fluid/platform/profiler.h | 28 +- paddle/fluid/platform/profiler_helper.h | 2 + 4 files changed, 397 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 94b2118ba9d..f954b297510 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -514,7 +514,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { ready_ops.pop(); auto& instr_node = vec_instruction_.at(instr_id); auto* op = instr_node.OpBase(); - platform::RecordEvent instruction_event(op->Type()); + platform::RecordEvent instruction_event(op->Type().c_str()); interpreter::WaitEvent(instr_node, place_); try { diff --git a/paddle/fluid/platform/profiler.cc b/paddle/fluid/platform/profiler.cc index 40d9bb99f44..f6d9c8f64fd 100644 --- a/paddle/fluid/platform/profiler.cc +++ b/paddle/fluid/platform/profiler.cc @@ -14,7 +14,9 @@ limitations under the License. */ #include // NOLINT #include +#include #include +#include #include "paddle/fluid/platform/device_tracer.h" #include "paddle/fluid/platform/enforce.h" @@ -30,6 +32,290 @@ PADDLE_DEFINE_EXPORTED_bool(enable_rpc_profiler, false, namespace paddle { namespace platform { +struct DurationEvent { + public: + DurationEvent(const char *name, uint64_t start_ns, uint64_t end_ns, + EventRole role) + : name(name), start_ns(start_ns), end_ns(end_ns), role(role) {} + + DurationEvent(std::function &arena_allocator, + const std::string &name_str, uint64_t start_ns, uint64_t end_ns, + EventRole role, const std::string &attr_str) + : start_ns(start_ns), end_ns(end_ns), role(role) { + auto buf = static_cast(arena_allocator(name_str.length() + 1)); + strncpy(buf, name_str.c_str(), name_str.length() + 1); + name = buf; + buf = static_cast(arena_allocator(attr_str.length() + 1)); + strncpy(buf, attr_str.c_str(), attr_str.length() + 1); + attr = buf; + } + + DurationEvent(const std::function &arena_allocator, + const std::string &name_str, uint64_t start_ns, uint64_t end_ns, + EventRole role) + : start_ns(start_ns), end_ns(end_ns), role(role) { + auto buf = static_cast(arena_allocator(name_str.length() + 1)); + strncpy(buf, name_str.c_str(), name_str.length() + 1); + name = buf; + } + + const char *name = nullptr; // not owned, designed for performance + uint64_t start_ns = 0; + uint64_t end_ns = 0; + EventRole role = EventRole::kOrdinary; + const char *attr = nullptr; // not owned, designed for performance +}; + +template +struct ContainsStdString + : std::conditional_t< + std::is_same>>::value, + std::true_type, ContainsStdString> {}; + +template +struct ContainsStdString + : std::is_same>> {}; + +template +class EventContainer { + public: + EventContainer() { + event_blocks_ = cur_event_block_ = new EventBlock; + str_blocks_ = cur_str_block_ = new StringBlock; + } + ~EventContainer() { + Reduce(); + delete event_blocks_; + for (auto cur = str_blocks_; cur != nullptr;) { + auto next = cur->next; + delete cur; + cur = next; + } + } + DISABLE_COPY_AND_ASSIGN(EventContainer); + + public: + // Record an event + template + void Record(Args &&... args) { + DoRecord(ContainsStdString(), std::forward(args)...); + } + + // Get all events and clear the container + std::vector Reduce(); + + // Return a buffer to store the string attribute of Event. + // HostEventRecorder locates in the static data section. + // So it's safe to use arena to avoid fragmented allocations. + char *GetStrBufFromArena(size_t size) { return GetStringStorage(size); } + + private: + struct EventBlock { + union InitDeferedEvent { + InitDeferedEvent() {} + ~InitDeferedEvent() {} + + EventType event; + }; + + static constexpr size_t kBlockSize = 1 << 24; // 16 MB + static constexpr size_t kAvailSize = + kBlockSize - sizeof(size_t) - sizeof(nullptr); + static constexpr size_t kNumEvents = kAvailSize / sizeof(InitDeferedEvent); + static constexpr size_t kPadSize = + kAvailSize - kNumEvents * sizeof(InitDeferedEvent); + static constexpr size_t kMinimumEventsPerBlock = 1024; + static_assert( + kNumEvents >= kMinimumEventsPerBlock, + "EventType is too large for kBlockSize, make kBlockSize larger"); + + size_t offset = 0; + EventBlock *next = nullptr; + InitDeferedEvent events[kNumEvents]; + char padding[kPadSize]; + }; + static_assert(sizeof(EventBlock) == EventBlock::kBlockSize, + "sizeof EventBlock must equal to kBlockSize"); + + struct StringBlock { + static constexpr size_t kBlockSize = 1 << 22; // 4 MB + static constexpr size_t kAvailSize = + kBlockSize - sizeof(size_t) - sizeof(nullptr); + + size_t offset = 0; + StringBlock *next = nullptr; + char storage[kAvailSize]; + }; + static_assert(sizeof(StringBlock) == StringBlock::kBlockSize, + "sizeof StringBlock must equal to kBlockSize"); + + // Record an event with string arguments + template + void DoRecord(std::true_type, Args &&... args) { + auto *storage = GetEventStorage(); + std::function allocator = [this](size_t size) { + return GetStrBufFromArena(size); + }; + new (storage) EventType(allocator, std::forward(args)...); + } + + // Record an event without any string argument + template + void DoRecord(std::false_type, Args &&... args) { + auto *storage = GetEventStorage(); + new (storage) EventType(std::forward(args)...); + } + + EventType *GetEventStorage(); + + char *GetStringStorage(size_t sz); + + EventBlock *event_blocks_ = nullptr; + EventBlock *cur_event_block_ = nullptr; + StringBlock *str_blocks_ = nullptr; + StringBlock *cur_str_block_ = nullptr; +}; + +template +std::vector EventContainer::Reduce() { + std::vector all_events; + size_t event_cnt = 0; + for (auto cur = event_blocks_; cur != nullptr; cur = cur->next) { + event_cnt += cur->offset; + } + all_events.reserve(event_cnt); + for (auto cur = event_blocks_; cur != nullptr;) { + for (size_t i = 0; i < cur->offset; ++i) { + all_events.emplace_back(cur->events[i].event); + } + auto next = cur->next; + delete cur; + cur = next; + } + event_blocks_ = cur_event_block_ = new EventBlock; + return std::move(all_events); +} + +template +EventType *EventContainer::GetEventStorage() { + if (UNLIKELY(cur_event_block_->offset >= + EventBlock::kNumEvents)) { // another block + cur_event_block_->next = new EventBlock; + cur_event_block_ = cur_event_block_->next; + } + auto &obj = cur_event_block_->events[cur_event_block_->offset].event; + ++cur_event_block_->offset; + return &obj; +} + +template +char *EventContainer::GetStringStorage(size_t sz) { + if (UNLIKELY(cur_str_block_->offset + sz > + StringBlock::kAvailSize)) { // another block + cur_str_block_->next = new StringBlock; + cur_str_block_ = cur_str_block_->next; + } + char *storage = cur_str_block_->storage + cur_str_block_->offset; + cur_str_block_->offset += sz; + return storage; +} + +struct ThreadEventSection { + std::string thread_name; + uint64_t thread_id; + std::vector events; +}; + +class ThreadEventRecorder { + public: + ThreadEventRecorder(); + DISABLE_COPY_AND_ASSIGN(ThreadEventRecorder); + + public: + // Forward call to EventContainer::Record + template + void RecordEvent(Args &&... args) { + base_evt_cntr_.Record(std::forward(args)...); + } + + ThreadEventSection GatherEvents() { + ThreadEventSection thr_sec; + thr_sec.thread_name = thread_name_; + thr_sec.thread_id = thread_id_; + thr_sec.events = std::move(base_evt_cntr_.Reduce()); + return std::move(thr_sec); + } + + private: + uint64_t thread_id_; + std::string thread_name_; + EventContainer base_evt_cntr_; +}; + +struct HostEventSection { + std::string process_name; + uint64_t process_id; + std::vector thr_sections; +}; + +class HostEventRecorder { + public: + // singleton + static HostEventRecorder &GetInstance() { + static HostEventRecorder instance; + return instance; + } + + // If your string argument has a longer lifetime than the Event, + // use 'const char*'. e.g.: string literal, op name, etc. + // Do your best to avoid using 'std::string' as the argument type. + // It will cause deep-copy to harm performance. + template + void RecordEvent(Args &&... args) { + GetThreadLocalRecorder().RecordEvent(std::forward(args)...); + } + + // Poor performance, call it at the ending + HostEventSection GatherEvents(); + + void RegisterThreadRecorder(uint64_t tid, ThreadEventRecorder *recorder) { + const std::lock_guard guard(thread_recorders_lock_); + thread_recorders_[tid] = recorder; + } + + private: + HostEventRecorder() = default; + DISABLE_COPY_AND_ASSIGN(HostEventRecorder); + + ThreadEventRecorder &GetThreadLocalRecorder() { + static thread_local ThreadEventRecorder tls_recorder; + return tls_recorder; + } + + std::mutex thread_recorders_lock_; + std::unordered_map thread_recorders_; +}; + +static uint64_t GetThreadId() { + return std::hash{}(std::this_thread::get_id()); +} + +ThreadEventRecorder::ThreadEventRecorder() { + thread_id_ = GetThreadId(); + HostEventRecorder::GetInstance().RegisterThreadRecorder(thread_id_, this); +} + +HostEventSection HostEventRecorder::GatherEvents() { + HostEventSection host_sec; + host_sec.thr_sections.reserve(thread_recorders_.size()); + for (auto &kv : thread_recorders_) { + host_sec.thr_sections.emplace_back(std::move(kv.second->GatherEvents())); + } + return std::move(host_sec); +} + MemEvenRecorder MemEvenRecorder::recorder; Event::Event(EventType type, std::string name, uint32_t thread_id, @@ -57,8 +343,44 @@ double Event::CudaElapsedMs(const Event &e) const { #endif } +RecordEvent::RecordEvent(const char *name, const EventRole role) { +#ifndef _WIN32 +#ifdef PADDLE_WITH_CUDA + if (g_enable_nvprof_hook) { + dynload::nvtxRangePushA(name); + is_pushed_ = true; + } +#endif +#endif + if (UNLIKELY(g_enable_host_event_recorder_hook == false)) { + RecordEvent(name, role, "none"); + return; + } + shallow_copy_name_ = name; + role_ = role; + start_ns_ = PosixInNsec(); +} + +RecordEvent::RecordEvent(const std::string &name, const EventRole role) { +#ifndef _WIN32 +#ifdef PADDLE_WITH_CUDA + if (g_enable_nvprof_hook) { + dynload::nvtxRangePushA(name.c_str()); + is_pushed_ = true; + } +#endif +#endif + if (UNLIKELY(g_enable_host_event_recorder_hook == false)) { + RecordEvent(name, role, "none"); + return; + } + name_ = new std::string(name); + role_ = role; + start_ns_ = PosixInNsec(); +} + RecordEvent::RecordEvent(const std::string &name, const EventRole role, - const std::string attr) { + const std::string &attr) { #ifndef _WIN32 #ifdef PADDLE_WITH_CUDA if (g_enable_nvprof_hook) { @@ -67,17 +389,26 @@ RecordEvent::RecordEvent(const std::string &name, const EventRole role, } #endif #endif + if (g_enable_host_event_recorder_hook) { + name_ = new std::string(name); + start_ns_ = PosixInNsec(); + attr_ = new std::string(attr); + return; + } + if (g_state == ProfilerState::kDisabled || name.empty()) return; // do some initialization + name_ = new std::string(name); start_ns_ = PosixInNsec(); role_ = role; + attr_ = new std::string(attr); is_enabled_ = true; // lock is not needed, the code below is thread-safe // Maybe need the same push/pop behavior. Event *e = PushEvent(name, role, attr); SetCurAnnotation(e); - name_ = e->name(); + // name_ = e->name(); } RecordEvent::~RecordEvent() { @@ -88,15 +419,36 @@ RecordEvent::~RecordEvent() { } #endif #endif + uint64_t end_ns = PosixInNsec(); + if (LIKELY(g_enable_host_event_recorder_hook)) { + if (LIKELY(shallow_copy_name_ != nullptr)) { + HostEventRecorder::GetInstance().RecordEvent(shallow_copy_name_, + start_ns_, end_ns, role_); + } else if (name_ != nullptr) { + if (attr_ == nullptr) { + HostEventRecorder::GetInstance().RecordEvent(*name_, start_ns_, end_ns, + role_); + } else { + HostEventRecorder::GetInstance().RecordEvent(*name_, start_ns_, end_ns, + role_, *attr_); + } + } + delete name_; + delete attr_; + return; + } + if (g_state == ProfilerState::kDisabled || !is_enabled_) return; // lock is not needed, the code below is thread-safe DeviceTracer *tracer = GetDeviceTracer(); if (tracer) { - tracer->AddCPURecords(CurAnnotationName(), start_ns_, PosixInNsec(), - BlockDepth(), g_thread_id); + tracer->AddCPURecords(CurAnnotationName(), start_ns_, end_ns, BlockDepth(), + g_thread_id); } ClearCurAnnotation(); - PopEvent(name_, role_); + PopEvent(*name_, role_); + delete name_; + delete attr_; } void MemEvenRecorder::PushMemRecord(const void *ptr, const Place &place, @@ -148,11 +500,11 @@ MemEvenRecorder::RecordMemEvent::~RecordMemEvent() { PopMemEvent(start_ns_, end_ns_, bytes_, place_, annotation_free); } -RecordRPCEvent::RecordRPCEvent(const std::string &name) { +/*RecordRPCEvent::RecordRPCEvent(const std::string &name) { if (FLAGS_enable_rpc_profiler) { event_.reset(new platform::RecordEvent(name)); } -} +}*/ RecordBlock::RecordBlock(int block_id) : is_enabled_(false), start_ns_(PosixInNsec()) { @@ -362,5 +714,20 @@ void NvprofEnableRecordEvent() { void NvprofDisableRecordEvent() { g_enable_nvprof_hook = false; } +void EnableHostEventRecorder() { g_enable_host_event_recorder_hook = true; } + +std::string PrintHostEvents() { + std::ostringstream oss; + auto host_evt_sec = HostEventRecorder::GetInstance().GatherEvents(); + for (const auto &thr_evt_sec : host_evt_sec.thr_sections) { + oss << thr_evt_sec.thread_id << std::endl; + for (const auto &evt : thr_evt_sec.events) { + oss << "{ " << evt.name << " | " << evt.start_ns << " | " << evt.end_ns + << " }" << std::endl; + } + } + return oss.str(); +} + } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/profiler.h b/paddle/fluid/platform/profiler.h index fbae6165e31..de814faec25 100644 --- a/paddle/fluid/platform/profiler.h +++ b/paddle/fluid/platform/profiler.h @@ -128,31 +128,38 @@ struct MemEvenRecorder { }; struct RecordEvent { - RecordEvent(const std::string& name, - const EventRole role = EventRole::kOrdinary, - const std::string attr = "none"); + explicit RecordEvent(const std::string& name, + const EventRole role = EventRole::kOrdinary); + + explicit RecordEvent(const char* name, + const EventRole role = EventRole::kOrdinary); + + RecordEvent(const std::string& name, const EventRole role, + const std::string& attr); ~RecordEvent(); bool is_enabled_{false}; bool is_pushed_{false}; - uint64_t start_ns_; // Event name - std::string name_; + const std::string* name_{nullptr}; + const char* shallow_copy_name_{nullptr}; + uint64_t start_ns_; // Need to distinguish name by op type, block_id, program_id and perhaps // different kernel invocations within an op. - std::string full_name_; + // std::string full_name_; EventRole role_{EventRole::kOrdinary}; + const std::string* attr_{nullptr}; }; -class RecordRPCEvent { +/*class RecordRPCEvent { public: explicit RecordRPCEvent(const std::string& name); ~RecordRPCEvent() {} private: std::unique_ptr event_; -}; +};*/ struct RecordBlock { explicit RecordBlock(int block_id); @@ -242,5 +249,10 @@ int64_t ListenerId(); void NvprofEnableRecordEvent(); void NvprofDisableRecordEvent(); +void EnableHostEventRecorder(); + +// Defined for UT +std::string PrintHostEvents(); + } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/profiler_helper.h b/paddle/fluid/platform/profiler_helper.h index a8438263cb9..3408971efa4 100644 --- a/paddle/fluid/platform/profiler_helper.h +++ b/paddle/fluid/platform/profiler_helper.h @@ -47,6 +47,8 @@ static TracerOption g_tracer_option = TracerOption::kDefault; static ProfilerState g_state = ProfilerState::kDisabled; // To hook RecordEvent's events, use it to nvtx timeline static bool g_enable_nvprof_hook = false; +// To hook RecordEvent, use HostEventRecorder +static bool g_enable_host_event_recorder_hook = false; // The thread local event list only can be accessed by the specific thread // The thread index of each thread static thread_local int32_t g_thread_id; -- GitLab