未验证 提交 feda7c1d 编写于 作者: L liutiexing 提交者: GitHub

HostEventRecorder (#37629)

* add align for WorkQueue

* add spinlock

* merge develop

* merge

* Add EventsWaiter

* Revert "Add EventsWaiter"

This reverts commit e206173aa9be7401b83a53581627bfaf557c8fb2.

* update HostEventTracer

* update HostEventTracer

* fix c++17

* update

* update

* update

* update

* fix bug
Co-authored-by: Nliutiexing <liutiexing@google.com>
上级 e0fc8937
...@@ -514,7 +514,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) { ...@@ -514,7 +514,7 @@ void InterpreterCore::RunInstructionAsync(size_t instr_id) {
ready_ops.pop(); ready_ops.pop();
auto& instr_node = vec_instruction_.at(instr_id); auto& instr_node = vec_instruction_.at(instr_id);
auto* op = instr_node.OpBase(); auto* op = instr_node.OpBase();
platform::RecordEvent instruction_event(op->Type()); platform::RecordEvent instruction_event(op->Type().c_str());
interpreter::WaitEvent(instr_node, place_); interpreter::WaitEvent(instr_node, place_);
try { try {
......
...@@ -14,7 +14,9 @@ limitations under the License. */ ...@@ -14,7 +14,9 @@ limitations under the License. */
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <random> #include <random>
#include <sstream>
#include <string> #include <string>
#include <type_traits>
#include "paddle/fluid/platform/device_tracer.h" #include "paddle/fluid/platform/device_tracer.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -30,6 +32,290 @@ PADDLE_DEFINE_EXPORTED_bool(enable_rpc_profiler, false, ...@@ -30,6 +32,290 @@ PADDLE_DEFINE_EXPORTED_bool(enable_rpc_profiler, false,
namespace paddle { namespace paddle {
namespace platform { namespace platform {
struct DurationEvent {
public:
DurationEvent(const char *name, uint64_t start_ns, uint64_t end_ns,
EventRole role)
: name(name), start_ns(start_ns), end_ns(end_ns), role(role) {}
DurationEvent(std::function<void *(size_t)> &arena_allocator,
const std::string &name_str, uint64_t start_ns, uint64_t end_ns,
EventRole role, const std::string &attr_str)
: start_ns(start_ns), end_ns(end_ns), role(role) {
auto buf = static_cast<char *>(arena_allocator(name_str.length() + 1));
strncpy(buf, name_str.c_str(), name_str.length() + 1);
name = buf;
buf = static_cast<char *>(arena_allocator(attr_str.length() + 1));
strncpy(buf, attr_str.c_str(), attr_str.length() + 1);
attr = buf;
}
DurationEvent(const std::function<void *(size_t)> &arena_allocator,
const std::string &name_str, uint64_t start_ns, uint64_t end_ns,
EventRole role)
: start_ns(start_ns), end_ns(end_ns), role(role) {
auto buf = static_cast<char *>(arena_allocator(name_str.length() + 1));
strncpy(buf, name_str.c_str(), name_str.length() + 1);
name = buf;
}
const char *name = nullptr; // not owned, designed for performance
uint64_t start_ns = 0;
uint64_t end_ns = 0;
EventRole role = EventRole::kOrdinary;
const char *attr = nullptr; // not owned, designed for performance
};
template <typename HeadType, typename... RestTypes>
struct ContainsStdString
: std::conditional_t<
std::is_same<std::string, std::remove_cv_t<std::remove_reference_t<
HeadType>>>::value,
std::true_type, ContainsStdString<RestTypes...>> {};
template <typename TailType>
struct ContainsStdString<TailType>
: std::is_same<std::string,
std::remove_cv_t<std::remove_reference_t<TailType>>> {};
template <typename EventType>
class EventContainer {
public:
EventContainer() {
event_blocks_ = cur_event_block_ = new EventBlock;
str_blocks_ = cur_str_block_ = new StringBlock;
}
~EventContainer() {
Reduce();
delete event_blocks_;
for (auto cur = str_blocks_; cur != nullptr;) {
auto next = cur->next;
delete cur;
cur = next;
}
}
DISABLE_COPY_AND_ASSIGN(EventContainer);
public:
// Record an event
template <typename... Args>
void Record(Args &&... args) {
DoRecord(ContainsStdString<Args...>(), std::forward<Args>(args)...);
}
// Get all events and clear the container
std::vector<EventType> Reduce();
// Return a buffer to store the string attribute of Event.
// HostEventRecorder locates in the static data section.
// So it's safe to use arena to avoid fragmented allocations.
char *GetStrBufFromArena(size_t size) { return GetStringStorage(size); }
private:
struct EventBlock {
union InitDeferedEvent {
InitDeferedEvent() {}
~InitDeferedEvent() {}
EventType event;
};
static constexpr size_t kBlockSize = 1 << 24; // 16 MB
static constexpr size_t kAvailSize =
kBlockSize - sizeof(size_t) - sizeof(nullptr);
static constexpr size_t kNumEvents = kAvailSize / sizeof(InitDeferedEvent);
static constexpr size_t kPadSize =
kAvailSize - kNumEvents * sizeof(InitDeferedEvent);
static constexpr size_t kMinimumEventsPerBlock = 1024;
static_assert(
kNumEvents >= kMinimumEventsPerBlock,
"EventType is too large for kBlockSize, make kBlockSize larger");
size_t offset = 0;
EventBlock *next = nullptr;
InitDeferedEvent events[kNumEvents];
char padding[kPadSize];
};
static_assert(sizeof(EventBlock) == EventBlock::kBlockSize,
"sizeof EventBlock must equal to kBlockSize");
struct StringBlock {
static constexpr size_t kBlockSize = 1 << 22; // 4 MB
static constexpr size_t kAvailSize =
kBlockSize - sizeof(size_t) - sizeof(nullptr);
size_t offset = 0;
StringBlock *next = nullptr;
char storage[kAvailSize];
};
static_assert(sizeof(StringBlock) == StringBlock::kBlockSize,
"sizeof StringBlock must equal to kBlockSize");
// Record an event with string arguments
template <typename... Args>
void DoRecord(std::true_type, Args &&... args) {
auto *storage = GetEventStorage();
std::function<void *(size_t)> allocator = [this](size_t size) {
return GetStrBufFromArena(size);
};
new (storage) EventType(allocator, std::forward<Args>(args)...);
}
// Record an event without any string argument
template <typename... Args>
void DoRecord(std::false_type, Args &&... args) {
auto *storage = GetEventStorage();
new (storage) EventType(std::forward<Args>(args)...);
}
EventType *GetEventStorage();
char *GetStringStorage(size_t sz);
EventBlock *event_blocks_ = nullptr;
EventBlock *cur_event_block_ = nullptr;
StringBlock *str_blocks_ = nullptr;
StringBlock *cur_str_block_ = nullptr;
};
template <typename EventType>
std::vector<EventType> EventContainer<EventType>::Reduce() {
std::vector<EventType> all_events;
size_t event_cnt = 0;
for (auto cur = event_blocks_; cur != nullptr; cur = cur->next) {
event_cnt += cur->offset;
}
all_events.reserve(event_cnt);
for (auto cur = event_blocks_; cur != nullptr;) {
for (size_t i = 0; i < cur->offset; ++i) {
all_events.emplace_back(cur->events[i].event);
}
auto next = cur->next;
delete cur;
cur = next;
}
event_blocks_ = cur_event_block_ = new EventBlock;
return std::move(all_events);
}
template <typename EventType>
EventType *EventContainer<EventType>::GetEventStorage() {
if (UNLIKELY(cur_event_block_->offset >=
EventBlock::kNumEvents)) { // another block
cur_event_block_->next = new EventBlock;
cur_event_block_ = cur_event_block_->next;
}
auto &obj = cur_event_block_->events[cur_event_block_->offset].event;
++cur_event_block_->offset;
return &obj;
}
template <typename EventType>
char *EventContainer<EventType>::GetStringStorage(size_t sz) {
if (UNLIKELY(cur_str_block_->offset + sz >
StringBlock::kAvailSize)) { // another block
cur_str_block_->next = new StringBlock;
cur_str_block_ = cur_str_block_->next;
}
char *storage = cur_str_block_->storage + cur_str_block_->offset;
cur_str_block_->offset += sz;
return storage;
}
struct ThreadEventSection {
std::string thread_name;
uint64_t thread_id;
std::vector<DurationEvent> events;
};
class ThreadEventRecorder {
public:
ThreadEventRecorder();
DISABLE_COPY_AND_ASSIGN(ThreadEventRecorder);
public:
// Forward call to EventContainer::Record
template <typename... Args>
void RecordEvent(Args &&... args) {
base_evt_cntr_.Record(std::forward<Args>(args)...);
}
ThreadEventSection GatherEvents() {
ThreadEventSection thr_sec;
thr_sec.thread_name = thread_name_;
thr_sec.thread_id = thread_id_;
thr_sec.events = std::move(base_evt_cntr_.Reduce());
return std::move(thr_sec);
}
private:
uint64_t thread_id_;
std::string thread_name_;
EventContainer<DurationEvent> base_evt_cntr_;
};
struct HostEventSection {
std::string process_name;
uint64_t process_id;
std::vector<ThreadEventSection> thr_sections;
};
class HostEventRecorder {
public:
// singleton
static HostEventRecorder &GetInstance() {
static HostEventRecorder instance;
return instance;
}
// If your string argument has a longer lifetime than the Event,
// use 'const char*'. e.g.: string literal, op name, etc.
// Do your best to avoid using 'std::string' as the argument type.
// It will cause deep-copy to harm performance.
template <typename... Args>
void RecordEvent(Args &&... args) {
GetThreadLocalRecorder().RecordEvent(std::forward<Args>(args)...);
}
// Poor performance, call it at the ending
HostEventSection GatherEvents();
void RegisterThreadRecorder(uint64_t tid, ThreadEventRecorder *recorder) {
const std::lock_guard<std::mutex> guard(thread_recorders_lock_);
thread_recorders_[tid] = recorder;
}
private:
HostEventRecorder() = default;
DISABLE_COPY_AND_ASSIGN(HostEventRecorder);
ThreadEventRecorder &GetThreadLocalRecorder() {
static thread_local ThreadEventRecorder tls_recorder;
return tls_recorder;
}
std::mutex thread_recorders_lock_;
std::unordered_map<uint64_t, ThreadEventRecorder *> thread_recorders_;
};
static uint64_t GetThreadId() {
return std::hash<std::thread::id>{}(std::this_thread::get_id());
}
ThreadEventRecorder::ThreadEventRecorder() {
thread_id_ = GetThreadId();
HostEventRecorder::GetInstance().RegisterThreadRecorder(thread_id_, this);
}
HostEventSection HostEventRecorder::GatherEvents() {
HostEventSection host_sec;
host_sec.thr_sections.reserve(thread_recorders_.size());
for (auto &kv : thread_recorders_) {
host_sec.thr_sections.emplace_back(std::move(kv.second->GatherEvents()));
}
return std::move(host_sec);
}
MemEvenRecorder MemEvenRecorder::recorder; MemEvenRecorder MemEvenRecorder::recorder;
Event::Event(EventType type, std::string name, uint32_t thread_id, Event::Event(EventType type, std::string name, uint32_t thread_id,
...@@ -57,8 +343,44 @@ double Event::CudaElapsedMs(const Event &e) const { ...@@ -57,8 +343,44 @@ double Event::CudaElapsedMs(const Event &e) const {
#endif #endif
} }
RecordEvent::RecordEvent(const char *name, const EventRole role) {
#ifndef _WIN32
#ifdef PADDLE_WITH_CUDA
if (g_enable_nvprof_hook) {
dynload::nvtxRangePushA(name);
is_pushed_ = true;
}
#endif
#endif
if (UNLIKELY(g_enable_host_event_recorder_hook == false)) {
RecordEvent(name, role, "none");
return;
}
shallow_copy_name_ = name;
role_ = role;
start_ns_ = PosixInNsec();
}
RecordEvent::RecordEvent(const std::string &name, const EventRole role) {
#ifndef _WIN32
#ifdef PADDLE_WITH_CUDA
if (g_enable_nvprof_hook) {
dynload::nvtxRangePushA(name.c_str());
is_pushed_ = true;
}
#endif
#endif
if (UNLIKELY(g_enable_host_event_recorder_hook == false)) {
RecordEvent(name, role, "none");
return;
}
name_ = new std::string(name);
role_ = role;
start_ns_ = PosixInNsec();
}
RecordEvent::RecordEvent(const std::string &name, const EventRole role, RecordEvent::RecordEvent(const std::string &name, const EventRole role,
const std::string attr) { const std::string &attr) {
#ifndef _WIN32 #ifndef _WIN32
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (g_enable_nvprof_hook) { if (g_enable_nvprof_hook) {
...@@ -67,17 +389,26 @@ RecordEvent::RecordEvent(const std::string &name, const EventRole role, ...@@ -67,17 +389,26 @@ RecordEvent::RecordEvent(const std::string &name, const EventRole role,
} }
#endif #endif
#endif #endif
if (g_enable_host_event_recorder_hook) {
name_ = new std::string(name);
start_ns_ = PosixInNsec();
attr_ = new std::string(attr);
return;
}
if (g_state == ProfilerState::kDisabled || name.empty()) return; if (g_state == ProfilerState::kDisabled || name.empty()) return;
// do some initialization // do some initialization
name_ = new std::string(name);
start_ns_ = PosixInNsec(); start_ns_ = PosixInNsec();
role_ = role; role_ = role;
attr_ = new std::string(attr);
is_enabled_ = true; is_enabled_ = true;
// lock is not needed, the code below is thread-safe // lock is not needed, the code below is thread-safe
// Maybe need the same push/pop behavior. // Maybe need the same push/pop behavior.
Event *e = PushEvent(name, role, attr); Event *e = PushEvent(name, role, attr);
SetCurAnnotation(e); SetCurAnnotation(e);
name_ = e->name(); // name_ = e->name();
} }
RecordEvent::~RecordEvent() { RecordEvent::~RecordEvent() {
...@@ -88,15 +419,36 @@ RecordEvent::~RecordEvent() { ...@@ -88,15 +419,36 @@ RecordEvent::~RecordEvent() {
} }
#endif #endif
#endif #endif
uint64_t end_ns = PosixInNsec();
if (LIKELY(g_enable_host_event_recorder_hook)) {
if (LIKELY(shallow_copy_name_ != nullptr)) {
HostEventRecorder::GetInstance().RecordEvent(shallow_copy_name_,
start_ns_, end_ns, role_);
} else if (name_ != nullptr) {
if (attr_ == nullptr) {
HostEventRecorder::GetInstance().RecordEvent(*name_, start_ns_, end_ns,
role_);
} else {
HostEventRecorder::GetInstance().RecordEvent(*name_, start_ns_, end_ns,
role_, *attr_);
}
}
delete name_;
delete attr_;
return;
}
if (g_state == ProfilerState::kDisabled || !is_enabled_) return; if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
// lock is not needed, the code below is thread-safe // lock is not needed, the code below is thread-safe
DeviceTracer *tracer = GetDeviceTracer(); DeviceTracer *tracer = GetDeviceTracer();
if (tracer) { if (tracer) {
tracer->AddCPURecords(CurAnnotationName(), start_ns_, PosixInNsec(), tracer->AddCPURecords(CurAnnotationName(), start_ns_, end_ns, BlockDepth(),
BlockDepth(), g_thread_id); g_thread_id);
} }
ClearCurAnnotation(); ClearCurAnnotation();
PopEvent(name_, role_); PopEvent(*name_, role_);
delete name_;
delete attr_;
} }
void MemEvenRecorder::PushMemRecord(const void *ptr, const Place &place, void MemEvenRecorder::PushMemRecord(const void *ptr, const Place &place,
...@@ -148,11 +500,11 @@ MemEvenRecorder::RecordMemEvent::~RecordMemEvent() { ...@@ -148,11 +500,11 @@ MemEvenRecorder::RecordMemEvent::~RecordMemEvent() {
PopMemEvent(start_ns_, end_ns_, bytes_, place_, annotation_free); PopMemEvent(start_ns_, end_ns_, bytes_, place_, annotation_free);
} }
RecordRPCEvent::RecordRPCEvent(const std::string &name) { /*RecordRPCEvent::RecordRPCEvent(const std::string &name) {
if (FLAGS_enable_rpc_profiler) { if (FLAGS_enable_rpc_profiler) {
event_.reset(new platform::RecordEvent(name)); event_.reset(new platform::RecordEvent(name));
} }
} }*/
RecordBlock::RecordBlock(int block_id) RecordBlock::RecordBlock(int block_id)
: is_enabled_(false), start_ns_(PosixInNsec()) { : is_enabled_(false), start_ns_(PosixInNsec()) {
...@@ -362,5 +714,20 @@ void NvprofEnableRecordEvent() { ...@@ -362,5 +714,20 @@ void NvprofEnableRecordEvent() {
void NvprofDisableRecordEvent() { g_enable_nvprof_hook = false; } void NvprofDisableRecordEvent() { g_enable_nvprof_hook = false; }
void EnableHostEventRecorder() { g_enable_host_event_recorder_hook = true; }
std::string PrintHostEvents() {
std::ostringstream oss;
auto host_evt_sec = HostEventRecorder::GetInstance().GatherEvents();
for (const auto &thr_evt_sec : host_evt_sec.thr_sections) {
oss << thr_evt_sec.thread_id << std::endl;
for (const auto &evt : thr_evt_sec.events) {
oss << "{ " << evt.name << " | " << evt.start_ns << " | " << evt.end_ns
<< " }" << std::endl;
}
}
return oss.str();
}
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -128,31 +128,38 @@ struct MemEvenRecorder { ...@@ -128,31 +128,38 @@ struct MemEvenRecorder {
}; };
struct RecordEvent { struct RecordEvent {
RecordEvent(const std::string& name, explicit RecordEvent(const std::string& name,
const EventRole role = EventRole::kOrdinary, const EventRole role = EventRole::kOrdinary);
const std::string attr = "none");
explicit RecordEvent(const char* name,
const EventRole role = EventRole::kOrdinary);
RecordEvent(const std::string& name, const EventRole role,
const std::string& attr);
~RecordEvent(); ~RecordEvent();
bool is_enabled_{false}; bool is_enabled_{false};
bool is_pushed_{false}; bool is_pushed_{false};
uint64_t start_ns_;
// Event name // Event name
std::string name_; const std::string* name_{nullptr};
const char* shallow_copy_name_{nullptr};
uint64_t start_ns_;
// Need to distinguish name by op type, block_id, program_id and perhaps // Need to distinguish name by op type, block_id, program_id and perhaps
// different kernel invocations within an op. // different kernel invocations within an op.
std::string full_name_; // std::string full_name_;
EventRole role_{EventRole::kOrdinary}; EventRole role_{EventRole::kOrdinary};
const std::string* attr_{nullptr};
}; };
class RecordRPCEvent { /*class RecordRPCEvent {
public: public:
explicit RecordRPCEvent(const std::string& name); explicit RecordRPCEvent(const std::string& name);
~RecordRPCEvent() {} ~RecordRPCEvent() {}
private: private:
std::unique_ptr<RecordEvent> event_; std::unique_ptr<RecordEvent> event_;
}; };*/
struct RecordBlock { struct RecordBlock {
explicit RecordBlock(int block_id); explicit RecordBlock(int block_id);
...@@ -242,5 +249,10 @@ int64_t ListenerId(); ...@@ -242,5 +249,10 @@ int64_t ListenerId();
void NvprofEnableRecordEvent(); void NvprofEnableRecordEvent();
void NvprofDisableRecordEvent(); void NvprofDisableRecordEvent();
void EnableHostEventRecorder();
// Defined for UT
std::string PrintHostEvents();
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -47,6 +47,8 @@ static TracerOption g_tracer_option = TracerOption::kDefault; ...@@ -47,6 +47,8 @@ static TracerOption g_tracer_option = TracerOption::kDefault;
static ProfilerState g_state = ProfilerState::kDisabled; static ProfilerState g_state = ProfilerState::kDisabled;
// To hook RecordEvent's events, use it to nvtx timeline // To hook RecordEvent's events, use it to nvtx timeline
static bool g_enable_nvprof_hook = false; static bool g_enable_nvprof_hook = false;
// To hook RecordEvent, use HostEventRecorder
static bool g_enable_host_event_recorder_hook = false;
// The thread local event list only can be accessed by the specific thread // The thread local event list only can be accessed by the specific thread
// The thread index of each thread // The thread index of each thread
static thread_local int32_t g_thread_id; static thread_local int32_t g_thread_id;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册