From caf10b474f0452c095ca7d78f7c643fbd1a5d109 Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Tue, 31 Jul 2018 16:09:30 +0800 Subject: [PATCH] make profiler use thread_id from g_thread_id Add a few more RecordEvent. Cleanup --- .../framework/details/all_reduce_op_handle.cc | 2 + .../framework/details/reduce_op_handle.cc | 2 + .../scope_buffered_ssa_graph_executor.cc | 2 + .../details/threaded_ssa_graph_executor.cc | 4 + .../operators/distributed/send_recv.proto | 84 +++++++++++++++++++ paddle/fluid/operators/parallel_do_op.cc | 5 -- paddle/fluid/platform/device_tracer.cc | 10 --- paddle/fluid/platform/device_tracer.h | 4 - paddle/fluid/platform/profiler.cc | 14 +--- paddle/fluid/platform/profiler.h | 5 -- 10 files changed, 96 insertions(+), 36 deletions(-) create mode 100644 paddle/fluid/operators/distributed/send_recv.proto diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index 700c73c74..bf493a3fa 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -17,6 +17,7 @@ #include "paddle/fluid/framework/details/container_cast.h" #include "paddle/fluid/framework/details/reduce_and_gather.h" #include "paddle/fluid/framework/details/variable_visitor.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace framework { @@ -45,6 +46,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, #endif void AllReduceOpHandle::RunImpl() { + platform::RecordEvent r("all_reduce", nullptr); if (NoDummyInputSize() == 1) { return; // No need to all reduce when GPU count = 1; } else { diff --git a/paddle/fluid/framework/details/reduce_op_handle.cc b/paddle/fluid/framework/details/reduce_op_handle.cc index 7160e346d..68bdfbaf5 100644 --- a/paddle/fluid/framework/details/reduce_op_handle.cc +++ b/paddle/fluid/framework/details/reduce_op_handle.cc @@ -16,12 +16,14 @@ #include "paddle/fluid/framework/details/container_cast.h" #include "paddle/fluid/framework/details/reduce_and_gather.h" #include "paddle/fluid/framework/details/variable_visitor.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace framework { namespace details { void ReduceOpHandle::RunImpl() { + platform::RecordEvent r("reduce", nullptr); if (places_.size() == 1) return; // the input and output may have dummy var. auto in_var_handles = DynamicCast(inputs_); diff --git a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc index 1d80bab90..5bd974d6b 100644 --- a/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc @@ -17,6 +17,7 @@ #include #include #include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace framework { @@ -62,6 +63,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run( eptr = std::current_exception(); } + platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun", nullptr); drop_scope_counter_ += 1; if (!fetch_tensors.empty() || drop_scope_counter_ == strategy_.num_iteration_per_drop_scope_) { diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index e556c84b0..0eaf9a9c9 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -15,6 +15,7 @@ #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/ssa_graph_builder.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace framework { @@ -34,6 +35,8 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( FeedFetchList ThreadedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { + std::unique_ptr event( + new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr)); std::unordered_map pending_ops; std::unordered_set pending_vars; BlockingQueue ready_vars; @@ -84,6 +87,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( // Clean run context run_op_futures_.clear(); exception_holder_.Clear(); + event.reset(nullptr); // Step 3. Execution while (!pending_vars.empty()) { diff --git a/paddle/fluid/operators/distributed/send_recv.proto b/paddle/fluid/operators/distributed/send_recv.proto new file mode 100644 index 000000000..2b3536b33 --- /dev/null +++ b/paddle/fluid/operators/distributed/send_recv.proto @@ -0,0 +1,84 @@ + +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. Licensed under +the Apache License, Version 2.0 (the "License"); you may not use this file +except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +syntax = "proto3"; +package sendrecv; + +option cc_generic_services = false; + +service SendRecvService { + // For parameter server round-robin like hashing, do not split tensors. + // Send and recv only one tensor + // TODO(typhoonzero): add streaming API + rpc SendVariable(VariableMessage) returns (VoidMessage) {} + // Argument VariableMessage for GetVariable should only contain varname. + rpc GetVariable(VariableMessage) returns (VariableMessage) {} + // pre-fetch variable by given variable name and Ids + rpc PrefetchVariable(VariableMessage) returns (VariableMessage) {} + + rpc CheckpointNotify(VariableMessage) returns (VoidMessage) {} +} + +// VariableMessage is serialized paddle variable message. +// It can be: +// LoDTensor +// SelectedRows +enum VarType { + LOD_TENSOR = 0; + SELECTED_ROWS = 1; + NCCL_ID = 2; +} + +// NOTICE(gongwb):don't modify this proto if you are not +// not familar with how we serialize in sendrecvop_utils.h +// and deserilize it in variable_response.h. +message VariableMessage { + enum Type { + // Pod Types + BOOL = 0; + INT16 = 1; + INT32 = 2; + INT64 = 3; + FP16 = 4; + FP32 = 5; + FP64 = 6; + } + + message LodData { repeated int64 lod_data = 1; } + string varname = 1; + // TODO(Yancey1989): reference framework::proto::VarDesc::VarType + VarType type = 2; + // bool persistable is not needed for sending. + // tensor info: + Type data_type = 3; + repeated int64 dims = 4; + + // lod details: + int64 lod_level = 5; + repeated LodData lod = 6; + // selected_rows height, aka. original dim0 + int64 slr_height = 7; + // tensor data + bytes serialized = 8; + // selected_rows data + bytes rows = 9; + // Look up table block execution output variable name. + string out_varname = 10; + // If 1, the ps server will start profiling, the ps + // server stops profiling and generates a profile to /tmp/profile_ps_* + // when profile switches from 1 to 2. + int64 profile = 11; +} + +message VoidMessage {} diff --git a/paddle/fluid/operators/parallel_do_op.cc b/paddle/fluid/operators/parallel_do_op.cc index c9744db3d..916cdad3f 100644 --- a/paddle/fluid/operators/parallel_do_op.cc +++ b/paddle/fluid/operators/parallel_do_op.cc @@ -18,7 +18,6 @@ limitations under the License. */ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -166,8 +165,6 @@ class ParallelDoOp : public framework::OperatorBase { workers.emplace_back( framework::Async([program, cur_scope, place, block, place_idx] { - // Give the thread an id to distinguish parallel block with same id. - platform::RecordThread rt(static_cast(place_idx) + 1); framework::Executor executor(place); executor.Run(*program, cur_scope, block->ID(), false /*create_local_scope*/); @@ -244,8 +241,6 @@ class ParallelDoGradOp : public framework::OperatorBase { // execute workers.emplace_back( framework::Async([program, cur_scope, place, block, i] { - // Give the thread an id to distinguish parallel block with same id. - platform::RecordThread rt(static_cast(i) + 1); framework::Executor executor(place); executor.Run(*program, cur_scope, block->ID(), false /*create_local_scope*/); diff --git a/paddle/fluid/platform/device_tracer.cc b/paddle/fluid/platform/device_tracer.cc index d9e2afada..8fa8dbd67 100644 --- a/paddle/fluid/platform/device_tracer.cc +++ b/paddle/fluid/platform/device_tracer.cc @@ -30,9 +30,6 @@ limitations under the License. */ namespace paddle { namespace platform { namespace { -// Current thread's id. Note, we don't distinguish nested threads -// for now. -thread_local int cur_thread_id = 0; // Tracking the nested block stacks of each thread. thread_local std::deque block_id_stack; // Tracking the nested event stacks. @@ -413,12 +410,5 @@ void SetCurBlock(int block_id) { block_id_stack.push_back(block_id); } void ClearCurBlock() { block_id_stack.pop_back(); } int BlockDepth() { return block_id_stack.size(); } - -void SetCurThread(int thread_id) { cur_thread_id = thread_id; } - -void ClearCurThread() { cur_thread_id = 0; } - -int CurThread() { return cur_thread_id; } - } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/device_tracer.h b/paddle/fluid/platform/device_tracer.h index 0375c7439..d2a571f43 100644 --- a/paddle/fluid/platform/device_tracer.h +++ b/paddle/fluid/platform/device_tracer.h @@ -99,9 +99,5 @@ std::string CurAnnotation(); void SetCurBlock(int block_id); void ClearCurBlock(); int BlockDepth(); - -void SetCurThread(int thread_id); -void ClearCurThread(); -int CurThread(); } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/profiler.cc b/paddle/fluid/platform/profiler.cc index 01de9d704..f4a0e2a86 100644 --- a/paddle/fluid/platform/profiler.cc +++ b/paddle/fluid/platform/profiler.cc @@ -190,7 +190,7 @@ RecordEvent::~RecordEvent() { DeviceTracer* tracer = GetDeviceTracer(); if (tracer) { tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(), - BlockDepth(), CurThread()); + BlockDepth(), g_thread_id); } ClearCurAnnotation(); PopEvent(name_, dev_ctx_); @@ -211,21 +211,11 @@ RecordBlock::~RecordBlock() { // We try to put all blocks at the same nested depth in the // same timeline lane. and distinguish the using thread_id. tracer->AddCPURecords(name_, start_ns_, PosixInNsec(), BlockDepth(), - CurThread()); + g_thread_id); } ClearCurBlock(); } -RecordThread::RecordThread(int thread_id) { - if (g_state == ProfilerState::kDisabled) return; - SetCurThread(thread_id); -} - -RecordThread::~RecordThread() { - if (g_state == ProfilerState::kDisabled) return; - ClearCurThread(); -} - void EnableProfiler(ProfilerState state) { PADDLE_ENFORCE(state != ProfilerState::kDisabled, "Can't enbale profling, since the input state is ", diff --git a/paddle/fluid/platform/profiler.h b/paddle/fluid/platform/profiler.h index bf4392537..c99d9c807 100644 --- a/paddle/fluid/platform/profiler.h +++ b/paddle/fluid/platform/profiler.h @@ -95,11 +95,6 @@ struct RecordBlock { uint64_t start_ns_; }; -struct RecordThread { - explicit RecordThread(int thread_id); - ~RecordThread(); -}; - // Return the event list of all threads. Assumed the returned value calls // event_lists, event_lists[i][j] represents the j-th Event of i-th thread. std::vector> GetAllEvents(); -- GitLab