diff --git a/imperative/src/impl/interpreter/events.h b/imperative/src/impl/interpreter/events.h index f97486e8ca8960d7b161400f8c53941062f74f62..65d1c729865085a1270622e693285ecdf8b4337c 100644 --- a/imperative/src/impl/interpreter/events.h +++ b/imperative/src/impl/interpreter/events.h @@ -16,77 +16,60 @@ namespace mgb::imperative::interpreter::intl { -struct CommandEvent { - IdentifiedCommand icmd; -}; - -struct CommandEnqueueEvent: CommandEvent {}; - -struct CommandExecuteEvent: CommandEvent {}; +#define DEF_EVENT(X, ...) struct X##Event __VA_ARGS__; +#define DEF_DUR_EVENT(X, ...) struct X##Event __VA_ARGS__; struct X##FinishEvent __VA_ARGS__; -struct CommandFinishEvent: CommandEvent {}; +DEF_EVENT(Command, { + IdentifiedCommand icmd; +}); -struct OpEvent { +DEF_EVENT(CommandEnqueue, :CommandEvent); +DEF_EVENT(CommandExecute, :CommandEvent); +DEF_EVENT(CommandFinish, :CommandEvent); +DEF_DUR_EVENT(OpExecute, { uint64_t id; std::shared_ptr op; SmallVector inputs; SmallVector outputs; -}; - -struct HostOpExecuteEvent: OpEvent {}; - -struct DeviceOpExecuteEvent: OpEvent {}; - -struct HostOpFinishEvent: OpEvent {}; - -struct DeviceOpFinishEvent: OpEvent {}; - -struct TensorDeclareEvent { +}); +DEF_DUR_EVENT(KernelExecute, { + uint64_t id; + std::shared_ptr op; + SmallVector inputs; + SmallVector outputs; +}); +DEF_EVENT(TensorDeclare, { uint64_t tensor_id; -}; - -struct TensorProduceEvent { +}); +DEF_EVENT(TensorProduce, { uint64_t tensor_id; TensorLayout layout; CompNode device; -}; - -struct TensorEraseEvent { +}); +DEF_EVENT(TensorErase, { uint64_t tensor_id; -}; - -struct TensorPropEvent { +}); +DEF_EVENT(TensorGetProp, { uint64_t tensor_id; TensorInfo::Prop prop; std::string prop_desc; -}; - -struct TensorGetPropEvent: TensorPropEvent{}; - -struct TensorWaitPropEvent: TensorPropEvent{}; - -struct TensorNotifyPropEvent: TensorPropEvent{}; - -struct TensorWaitPropFinishEvent: TensorPropEvent{}; - -struct SyncStartEvent {}; - -struct SyncFinishEvent {}; - -struct ScopeEvent { +}); +DEF_DUR_EVENT(TensorWaitProp, { + uint64_t tensor_id; + TensorInfo::Prop prop; + std::string prop_desc; +}); +DEF_EVENT(TensorNotifyProp, { + uint64_t tensor_id; + TensorInfo::Prop prop; + std::string prop_desc; +}); +DEF_DUR_EVENT(Sync, {}); +DEF_DUR_EVENT(Scope, { std::string name; -}; - -struct ChannelBeginScope: ScopeEvent {}; - -struct ChannelEndScope: ScopeEvent {}; - -struct WorkerBeginScope: ScopeEvent {}; - -struct WorkerEndScope: ScopeEvent {}; - -struct DeviceBeginScope: ScopeEvent {}; - -struct DeviceEndScope: ScopeEvent {}; +}); +DEF_DUR_EVENT(DeviceScope, { + std::string name; +}); } diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index a506329743711bb87a63f222b332350927facaf3..5525dbce2e2a97ca8f876bf26d08e40640f7b22f 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -23,6 +23,17 @@ using namespace imperative; using namespace interpreter; using namespace interpreter::intl; +#define RECORD_EVENT(type, ...) \ + if (state.profiler->is_profiling()) { \ + state.profiler->record_host(type{__VA_ARGS__}); \ + } \ + +#define RECORD_DEVICE_EVENT(type, device, ...) \ + if (state.profiler->is_profiling()) { \ + state.profiler->record_device((device), type{__VA_ARGS__}); \ + } \ + + std::thread::id ChannelImpl::get_worker_tid() { return m_worker_state.tid; } @@ -71,9 +82,7 @@ Handle ChannelImpl::put(const DeviceTensorND& data) { info->desc.layout = data.layout(); info->desc.comp_node = data.comp_node(); info->ptr = Tensor::make(data); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, info->desc.layout, info->desc.comp_node); - } + RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node); return info; } @@ -168,10 +177,8 @@ void ChannelImpl::dispatch_default_cpu( } return tid; }; - OpEvent event_data = {++m_last_id, op, tinfo_to_tid(input_infos), {}}; - if (state.profiler->is_profiling()) { - state.profiler->record_host(event_data); - } + auto apply_id = ++m_last_id; + RECORD_EVENT(OpExecuteEvent, apply_id, op, tinfo_to_tid(input_infos), {}); OpDef::apply_on_device_tensornd(*op, input_tensornds, &output_tensornds); @@ -187,10 +194,8 @@ void ChannelImpl::dispatch_default_cpu( outputs->push_back(info); } - event_data.outputs = tinfo_to_tid(output_infos); - if (state.profiler->is_profiling()) { - state.profiler->record_host(event_data); - } + RECORD_EVENT(OpExecuteFinishEvent, apply_id, op, + tinfo_to_tid(input_infos), tinfo_to_tid(output_infos)); } void ChannelImpl::dispatch_kernel( @@ -287,17 +292,13 @@ HostTensorND ChannelImpl::get_value(Handle handle) { if (!value_fetched()) { m_waitee = info; m_buffer.enqueue(GetValue{info}); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::HostValue); - } + RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::HostValue); m_cv.wait(lock, [&]() { check_worker_exc_unsafe(); tensor_ptr = info->ptr; return value_fetched(); }); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::HostValue); - } + RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::HostValue); m_waitee = nullptr; } return tensor_ptr->get_value(); @@ -316,16 +317,12 @@ TensorShape ChannelImpl::get_shape(Handle handle) { mgb_assert(!m_waitee); m_waitee = info; m_buffer.flush(); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::Shape); - } + RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::Shape); m_cv.wait(lock, [&]() { check_worker_exc_unsafe(); return static_cast(info->ptr); }); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::Shape); - } + RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::Shape); m_waitee = nullptr; TensorShape ret = info->ptr->layout(); mgb_assert(ret.ndim != 0); @@ -338,9 +335,7 @@ DType ChannelImpl::get_dtype(Handle handle) { mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto info = reinterpret_cast(handle); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::DType); - } + RECORD_EVENT(TensorGetPropEvent, info->id, TensorInfo::DType); auto ret = info->desc.layout.dtype; mgb_assert(ret.valid()); return ret; @@ -352,9 +347,7 @@ CompNode ChannelImpl::get_device(Handle handle) { mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto info = reinterpret_cast(handle); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::Device); - } + RECORD_EVENT(TensorGetPropEvent, info->id, TensorInfo::Device); auto ret = info->desc.comp_node; mgb_assert(ret.valid()); return ret; @@ -370,16 +363,12 @@ DeviceTensorND ChannelImpl::get_dev_tensor(Handle handle) { mgb_assert(!m_waitee); m_waitee = info; m_buffer.flush(); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::DevValue); - } + RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::DevValue); m_cv.wait(lock, [&]() { check_worker_exc_unsafe(); return static_cast(info->ptr); }); - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id, TensorInfo::DevValue); - } + RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::DevValue); m_waitee = nullptr; return info->ptr->dev_tensor(); } @@ -388,14 +377,10 @@ void ChannelImpl::sync() { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); m_buffer.flush(); - if (state.profiler->is_profiling()) { - state.profiler->record_host(); - } + RECORD_EVENT(SyncEvent); m_worker.wait_all_task_finish(); CompNode::sync_all(); - if (state.profiler->is_profiling()) { - state.profiler->record_host(); - } + RECORD_EVENT(SyncFinishEvent); MGB_LOCK_GUARD(m_mutex); check_worker_exc_unsafe(); } @@ -433,9 +418,7 @@ TensorInfo* ChannelImpl::alloc() { auto info = m_pool.alloc(); m_valid_handle.insert(info); info->id = m_last_id++; - if (state.profiler->is_profiling()) { - state.profiler->record_host(info->id); - } + RECORD_EVENT(TensorDeclareEvent, info->id); return info; } @@ -491,9 +474,7 @@ void ChannelImpl::recursive_free(TensorInfo* ptr) { void ChannelImpl::real_free(TensorInfo* ptr) { auto& state = get_worker_state(); MGB_LOCK_GUARD(m_mutex); - if (state.profiler->is_profiling()) { - state.profiler->record_host(ptr->id); - } + RECORD_EVENT(TensorEraseEvent, ptr->id); if (ptr->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) { m_dtr.erase_candidate(ptr); } @@ -515,8 +496,8 @@ void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice=tr lock.lock(); } m_dtr.update_used_time(dest); - if (notice && state.profiler->is_profiling()) { - state.profiler->record_host(dest->id, ptr->layout(), ptr->comp_node()); + if (notice) { + RECORD_EVENT(TensorProduceEvent, dest->id, ptr->layout(), ptr->comp_node()); } dest->value_fetched = ptr->value_fetched(); // update tensor desc for static infer @@ -636,10 +617,10 @@ void ChannelImpl::sync_device_scope(CompNode device) { auto& prev = state.device_scope_map[device]; auto& current = state.scopes; auto push_scope = [&](std::string name) { - state.profiler->record_device(device, name); + RECORD_DEVICE_EVENT(DeviceScopeEvent, device, name); }; auto pop_scope = [&](std::string name) { - state.profiler->record_device(device, name); + RECORD_DEVICE_EVENT(DeviceScopeFinishEvent, device, name); }; size_t similarity = 0; for (size_t i = 0; i < prev.size() && i < current.size(); i++) { @@ -661,17 +642,13 @@ void ChannelImpl::sync_device_scope(CompNode device) { void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { auto& state = get_worker_state(); - if (state.profiler->is_profiling()) { - state.profiler->record_host(icmd); - } + RECORD_EVENT(CommandExecuteEvent, icmd); bool finished = false; auto do_finish_command = [&]{ if (finished) { return; } - if (state.profiler->is_profiling()) { - state.profiler->record_host(icmd); - } + RECORD_EVENT(CommandFinishEvent, icmd); finished = true; }; //TODO: remove std::visit for support osx 10.12 @@ -701,16 +678,14 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { tensor_inputs.push_back(i->ptr); } // Begin profiling operator - OpEvent event_data; + auto tinfo_to_tid = [&](SmallVector tinfo) { + SmallVector tid; + for (auto* ptinfo: tinfo) { + tid.push_back(ptinfo->id); + } + return tid; + }; if (state.profiler->is_profiling()) { - auto tinfo_to_tid = [&](SmallVector tinfo) { - SmallVector tid; - for (auto* ptinfo: tinfo) { - tid.push_back(ptinfo->id); - } - return tid; - }; - event_data = {apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)}; // Collecting devices for (auto i : cmd.inputs) { devices.push_back(i->desc.comp_node); @@ -731,11 +706,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { // Before wait //TODO: split operator wait and execute so that OpWait could be corrected recorded. // Before execute + RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); if (state.profiler->is_profiling()) { - state.profiler->record_host(event_data); for (auto&& device: devices) { sync_device_scope(device); - state.profiler->record_device(device, event_data); + RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op, + tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); } } if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) { @@ -746,10 +722,10 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { auto tensor_outputs = OpDef::apply_on_physical_tensor( *cmd.op, std::move(tensor_inputs)); // After execute + RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); if (state.profiler->is_profiling()) { - state.profiler->record_host(event_data); for (auto&& device: devices) { - state.profiler->record_device(device, event_data); + RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); } } // End profiling operator @@ -875,12 +851,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { } else if constexpr (std::is_same_v) { state.scopes.push_back(cmd.scope_name); do_finish_command(); - state.profiler->record_host(cmd.scope_name); + RECORD_EVENT(ScopeEvent, cmd.scope_name); } else if constexpr (std::is_same_v) { mgb_assert(state.scopes.back() == cmd.scope_name, "scope name mismatch"); state.scopes.pop_back(); do_finish_command(); - state.profiler->record_host(cmd.scope_name); + RECORD_EVENT(ScopeFinishEvent, cmd.scope_name); } else { static_assert(!std::is_same_v); } @@ -938,9 +914,7 @@ void ChannelImpl::CommandBuffer::flush(Handle pos) { for (auto iter = m_commands.begin(); iter != pos; ++iter) { // mgb_log_debug("%s Flushed", to_string(*iter).c_str()); IdentifiedCommand icmd{++m_owner->m_last_id, std::move(*iter)}; - if (state.profiler->is_profiling()) { - state.profiler->record_host(icmd); - } + RECORD_EVENT(CommandEnqueueEvent, icmd); m_owner->m_worker.add_task(std::move(icmd)); } m_commands.erase(m_commands.begin(), pos); @@ -1060,8 +1034,8 @@ void ChannelImpl::stop_profile(std::string basename, std::string format) { void ChannelImpl::push_scope(std::string name) { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); + RECORD_EVENT(ScopeEvent, name); if (state.profiler->is_profiling()) { - state.profiler->record_host(name); state.scopes.push_back(name); m_buffer.enqueue(PushScope{name}); } @@ -1070,10 +1044,10 @@ void ChannelImpl::push_scope(std::string name) { void ChannelImpl::pop_scope(std::string name) { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); + RECORD_EVENT(ScopeFinishEvent, name); if (state.profiler->is_profiling()) { mgb_assert((!state.scopes.empty()) && state.scopes.back() == name, "scope name mismatch"); state.scopes.pop_back(); - state.profiler->record_host(name); m_buffer.enqueue(PopScope{name}); } } diff --git a/imperative/src/impl/interpreter/profiler.h b/imperative/src/impl/interpreter/profiler.h index 59ab4f59627f8713d0866588d0e5cd3d30d556af..58347537f78d4684c20db278b43968613792f430 100644 --- a/imperative/src/impl/interpreter/profiler.h +++ b/imperative/src/impl/interpreter/profiler.h @@ -21,15 +21,13 @@ namespace mgb::imperative::interpreter::intl { class InterpreterProfiler: public Profiler< CommandEnqueueEvent, CommandExecuteEvent, CommandFinishEvent, - HostOpExecuteEvent, HostOpFinishEvent, - DeviceOpExecuteEvent, DeviceOpFinishEvent, + OpExecuteEvent, OpExecuteFinishEvent, + KernelExecuteEvent, KernelExecuteFinishEvent, TensorDeclareEvent, TensorProduceEvent, TensorEraseEvent, TensorGetPropEvent, TensorWaitPropEvent, TensorNotifyPropEvent, TensorWaitPropFinishEvent, - SyncStartEvent, SyncFinishEvent, - ChannelBeginScope, ChannelEndScope, - WorkerBeginScope, WorkerEndScope, - DeviceBeginScope, DeviceEndScope> { - /*22 events now. Enum code may be a better solution*/ + SyncEvent, SyncFinishEvent, + ScopeEvent, ScopeFinishEvent, + DeviceScopeEvent, DeviceScopeFinishEvent> { public: enum Topic { @@ -71,8 +69,8 @@ public: result |= mask_of(); } if (topic & Operator) { - result |= mask_of(); - result |= mask_of(); + result |= mask_of(); + result |= mask_of(); } if (topic & TensorLifetime) { result |= mask_of(); @@ -81,11 +79,11 @@ public: result |= mask_of(); } if (topic & Sync) { - result |= mask_of(); + result |= mask_of(); } if (topic & Scope) { - result |= mask_of(); - result |= mask_of(); + result |= mask_of(); + result |= mask_of(); } return result; }