提交 4840c49b 编写于 作者: X Xin Pan

Better timeline

上级 cbfd15f9
......@@ -25,6 +25,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
DECLARE_bool(benchmark);
DEFINE_bool(check_nan_inf, false,
......@@ -33,6 +34,11 @@ DEFINE_bool(check_nan_inf, false,
namespace paddle {
namespace framework {
namespace {
// block id starts from 0. This id is used to represent the codeblock
// wrapping the first block 0.
int kProgramId = -1;
} // namespace
struct ExecutorPrepareContext {
ExecutorPrepareContext(const framework::ProgramDesc& prog, size_t block_id)
......@@ -94,6 +100,7 @@ static void CheckTensorNANOrInf(const std::string& name,
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
bool create_local_scope, bool create_vars) {
platform::RecordBlock b(block_id);
auto* ctx = Prepare(pdesc, block_id);
RunPreparedContext(ctx, scope, create_local_scope, create_vars);
delete ctx;
......@@ -184,6 +191,7 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
std::map<std::string, LoDTensor*>& fetch_targets,
const std::string& feed_holder_name,
const std::string& fetch_holder_name) {
platform::RecordBlock b(kProgramId);
auto* copy_program = new ProgramDesc(program);
auto* global_block = copy_program->MutableBlock(0);
......
......@@ -18,6 +18,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/safe_ref.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle {
namespace operators {
......@@ -158,7 +159,10 @@ class ParallelDoOp : public framework::OperatorBase {
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
workers.emplace_back(
framework::Async([program, cur_scope, place, block, place_idx] {
// Give the thread an id to distinguish parallel block with same id.
platform::RecordThread rt(static_cast<int>(place_idx) + 1);
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
......@@ -234,7 +238,10 @@ class ParallelDoGradOp : public framework::OperatorBase {
auto *cur_scope = sub_scopes[i];
// execute
workers.emplace_back(framework::Async([program, cur_scope, place, block] {
workers.emplace_back(
framework::Async([program, cur_scope, place, block, i] {
// Give the thread an id to distinguish parallel block with same id.
platform::RecordThread rt(static_cast<int>(i) + 1);
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
false /*create_local_scope*/);
......
......@@ -26,8 +26,14 @@ limitations under the License. */
namespace paddle {
namespace platform {
namespace {
// Current thread's id. Note, we don't distinguish nested threads
// for now.
thread_local int cur_thread_id = 0;
// Tracking the nested block stacks of each thread.
thread_local std::deque<int> block_id_stack;
// Tracking the nested event stacks.
thread_local std::deque<std::string> annotation_stack;
thread_local const char *cur_annotation = nullptr;
std::once_flag tracer_once_flag;
DeviceTracer *tracer = nullptr;
} // namespace
......@@ -191,19 +197,19 @@ class DeviceTracerImpl : public DeviceTracer {
correlations_[id] = anno;
}
void AddCPURecords(const char *anno, uint64_t start_ns, uint64_t end_ns) {
if (!anno) {
// TODO(panyx0718): Currently, it doesn't support nested situation
// Up-level can be cleared by low-level and therefore get nullptr
// here.
void AddCPURecords(const std::string &anno, uint64_t start_ns,
uint64_t end_ns, int64_t device_id, int64_t thread_id) {
if (anno.empty()) {
VLOG(1) << "Empty timeline annotation.";
return;
}
std::lock_guard<std::mutex> l(trace_mu_);
cpu_records_.push_back(CPURecord{anno, start_ns, end_ns, 0});
cpu_records_.push_back(
CPURecord{anno, start_ns, end_ns, device_id, thread_id});
}
void AddMemRecords(const std::string &name, uint64_t start_ns,
uint64_t end_ns, uint32_t device_id, uint32_t stream_id,
uint64_t end_ns, int64_t device_id, int64_t stream_id,
uint32_t correlation_id, uint64_t bytes) {
// 0 means timestamp information could not be collected for the kernel.
if (start_ns == 0 || end_ns == 0) {
......@@ -215,8 +221,8 @@ class DeviceTracerImpl : public DeviceTracer {
stream_id, correlation_id, bytes});
}
void AddKernelRecords(uint64_t start, uint64_t end, uint32_t device_id,
uint32_t stream_id, uint32_t correlation_id) {
void AddKernelRecords(uint64_t start, uint64_t end, int64_t device_id,
int64_t stream_id, uint32_t correlation_id) {
// 0 means timestamp information could not be collected for the kernel.
if (start == 0 || end == 0) {
VLOG(3) << correlation_id << " cannot be traced";
......@@ -270,27 +276,30 @@ class DeviceTracerImpl : public DeviceTracer {
continue;
}
auto *event = profile_pb.add_events();
event->set_type(proto::Event::GPUKernel);
event->set_name(correlations_.at(r.correlation_id));
event->set_start_ns(r.start_ns);
event->set_end_ns(r.end_ns);
event->set_stream_id(r.stream_id);
event->set_sub_device_id(r.stream_id);
event->set_device_id(r.device_id);
}
for (const CPURecord &r : cpu_records_) {
auto *event = profile_pb.add_events();
event->set_type(proto::Event::CPU);
event->set_name(r.name);
event->set_start_ns(r.start_ns);
event->set_end_ns(r.end_ns);
event->set_stream_id(r.thread_id);
event->set_device_id(-1);
event->set_sub_device_id(r.thread_id);
event->set_device_id(r.device_id);
}
for (const MemRecord &r : mem_records_) {
auto *event = profile_pb.add_events();
event->set_type(proto::Event::GPUKernel);
event->set_name(r.name);
event->set_start_ns(r.start_ns);
event->set_end_ns(r.end_ns);
event->set_stream_id(r.stream_id);
event->set_sub_device_id(r.stream_id);
event->set_device_id(r.device_id);
event->mutable_memcopy()->set_bytes(r.bytes);
}
......@@ -323,8 +332,9 @@ class DeviceTracerImpl : public DeviceTracer {
if ((domain == CUPTI_CB_DOMAIN_DRIVER_API) &&
(cbid == CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel)) {
if (cbInfo->callbackSite == CUPTI_API_ENTER) {
const std::string anno =
cur_annotation ? cur_annotation : cbInfo->symbolName;
const std::string anno = !annotation_stack.empty()
? annotation_stack.back()
: cbInfo->symbolName;
tracer->AddAnnotation(cbInfo->correlationId, anno);
}
} else {
......@@ -351,14 +361,15 @@ class DeviceTracerDummy : public DeviceTracer {
void AddAnnotation(uint64_t id, const std::string &anno) {}
void AddCPURecords(const char *anno, uint64_t start_ns, uint64_t end_ns) {}
void AddCPURecords(const std::string &anno, uint64_t start_ns,
uint64_t end_ns, int64_t device_id, int64_t thread_id) {}
void AddMemRecords(const std::string &name, uint64_t start_ns,
uint64_t end_ns, uint32_t device_id, uint32_t stream_id,
uint64_t end_ns, int64_t device_id, int64_t stream_id,
uint32_t correlation_id, uint64_t bytes) {}
void AddKernelRecords(uint64_t start, uint64_t end, uint32_t device_id,
uint32_t stream_id, uint32_t correlation_id) {}
void AddKernelRecords(uint64_t start, uint64_t end, int64_t device_id,
int64_t stream_id, uint32_t correlation_id) {}
bool IsEnabled() { return false; }
......@@ -384,11 +395,28 @@ DeviceTracer *GetDeviceTracer() {
return tracer;
}
void SetCurAnnotation(const char *anno) { cur_annotation = anno; }
void SetCurAnnotation(const std::string &anno) {
annotation_stack.push_back(anno);
}
void ClearCurAnnotation() { annotation_stack.pop_back(); }
std::string CurAnnotation() {
if (annotation_stack.empty()) return "";
return annotation_stack.back();
}
void SetCurBlock(int block_id) { block_id_stack.push_back(block_id); }
void ClearCurBlock() { block_id_stack.pop_back(); }
int BlockDepth() { return block_id_stack.size(); }
void SetCurThread(int thread_id) { cur_thread_id = thread_id; }
void ClearCurAnnotation() { cur_annotation = nullptr; }
void ClearCurThread() { cur_thread_id = 0; }
const char *CurAnnotation() { return cur_annotation; }
int CurThread() { return cur_thread_id; }
} // namespace platform
} // namespace paddle
......@@ -32,22 +32,23 @@ class DeviceTracer {
struct KernelRecord {
uint64_t start_ns;
uint64_t end_ns;
uint32_t device_id;
uint32_t stream_id;
int64_t device_id;
int64_t stream_id;
uint32_t correlation_id;
};
struct CPURecord {
std::string name;
uint64_t start_ns;
uint64_t end_ns;
uint64_t thread_id;
int64_t device_id;
int64_t thread_id;
};
struct MemRecord {
std::string name;
uint64_t start_ns;
uint64_t end_ns;
uint32_t device_id;
uint32_t stream_id;
int64_t device_id;
int64_t stream_id;
uint32_t correlation_id;
uint64_t bytes;
};
......@@ -64,18 +65,18 @@ class DeviceTracer {
virtual void AddAnnotation(uint64_t id, const std::string& anno) = 0;
virtual void AddMemRecords(const std::string& name, uint64_t start_ns,
uint64_t end_ns, uint32_t device_id,
uint32_t stream_id, uint32_t correlation_id,
uint64_t end_ns, int64_t device_id,
int64_t stream_id, uint32_t correlation_id,
uint64_t bytes) = 0;
virtual void AddCPURecords(const char* anno, uint64_t start_ns,
uint64_t end_ns) = 0;
virtual void AddCPURecords(const std::string& anno, uint64_t start_ns,
uint64_t end_ns, int64_t device_id,
int64_t thread_id) = 0;
// Add a cuda kernel stats. `correlation_id` will be mapped to annotation
// added before for human readability.
virtual void AddKernelRecords(uint64_t start, uint64_t end,
uint32_t device_id, uint32_t stream_id,
uint32_t correlation_id) = 0;
virtual void AddKernelRecords(uint64_t start, uint64_t end, int64_t device_id,
int64_t stream_id, uint32_t correlation_id) = 0;
// Generate a proto after done (Disabled).
virtual proto::Profile GenProfile(const std::string& profile_path) = 0;
......@@ -87,10 +88,18 @@ class DeviceTracer {
DeviceTracer* GetDeviceTracer();
// Set a name for the cuda kernel operation being launched by the thread.
void SetCurAnnotation(const char* anno);
void SetCurAnnotation(const std::string& anno);
// Clear the name after the operation is done.
void ClearCurAnnotation();
// Current name of the operation being run in the thread.
const char* CurAnnotation();
std::string CurAnnotation();
void SetCurBlock(int block_id);
void ClearCurBlock();
int BlockDepth();
void SetCurThread(int thread_id);
void ClearCurThread();
int CurThread();
} // namespace platform
} // namespace paddle
......@@ -147,19 +147,48 @@ RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
name_ = name;
PushEvent(name_, dev_ctx_);
// Maybe need the same push/pop behavior.
SetCurAnnotation(name_.c_str());
SetCurAnnotation(name_);
}
RecordEvent::~RecordEvent() {
if (g_state == ProfilerState::kDisabled) return;
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec());
tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(),
BlockDepth(), CurThread());
}
ClearCurAnnotation();
PopEvent(name_, dev_ctx_);
}
RecordBlock::RecordBlock(int block_id) : start_ns_(PosixInNsec()) {
if (g_state == ProfilerState::kDisabled) return;
SetCurBlock(block_id);
name_ = string::Sprintf("block_%d", block_id);
}
RecordBlock::~RecordBlock() {
if (g_state == ProfilerState::kDisabled) return;
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
// We try to put all blocks at the same nested depth in the
// same timeline lane. and distinguish the using thread_id.
tracer->AddCPURecords(name_, start_ns_, PosixInNsec(), BlockDepth(),
CurThread());
}
ClearCurBlock();
}
RecordThread::RecordThread(int thread_id) {
if (g_state == ProfilerState::kDisabled) return;
SetCurThread(thread_id);
}
RecordThread::~RecordThread() {
if (g_state == ProfilerState::kDisabled) return;
ClearCurThread();
}
void EnableProfiler(ProfilerState state) {
PADDLE_ENFORCE(state != ProfilerState::kDisabled,
"Can't enbale profling, since the input state is ",
......
......@@ -118,6 +118,24 @@ struct RecordEvent {
std::string full_name_;
};
struct RecordBlock {
explicit RecordBlock(int block_id);
~RecordBlock();
private:
std::string name_;
uint64_t start_ns_;
int block_id_;
};
struct RecordThread {
explicit RecordThread(int thread_id);
~RecordThread();
private:
uint64_t start_ns_;
};
// Return the event list of all threads. Assumed the returned value calls
// event_lists, event_lists[i][j] represents the j-th Event of i-th thread.
std::vector<std::vector<Event>> GetAllEvents();
......
......@@ -18,12 +18,17 @@ package paddle.platform.proto;
message MemCopy { optional uint64 bytes = 1; }
message Event {
enum EventType {
CPU = 0;
GPUKernel = 1;
}
optional EventType type = 8;
optional string name = 1;
optional uint64 start_ns = 2;
optional uint64 end_ns = 3;
// When positive, it represents gpu id. When -1, it represents CPU.
optional int64 device_id = 5;
optional uint32 stream_id = 6;
optional int64 sub_device_id = 6;
optional MemCopy memcopy = 7;
}
......
......@@ -31,8 +31,22 @@ class TestProfiler(unittest.TestCase):
with fluid.program_guard(main_program, startup_program):
image = fluid.layers.data(name='x', shape=[784], dtype='float32')
hidden1 = fluid.layers.fc(input=image, size=128, act='relu')
hidden2 = fluid.layers.fc(input=hidden1, size=64, act='relu')
hidden1 = fluid.layers.fc(input=image, size=64, act='relu')
i = layers.zeros(shape=[1], dtype='int64')
counter = fluid.layers.zeros(
shape=[1], dtype='int64', force_cpu=True)
until = layers.fill_constant([1], dtype='int64', value=10)
data_arr = layers.array_write(hidden1, i)
cond = fluid.layers.less_than(x=counter, y=until)
while_op = fluid.layers.While(cond=cond)
with while_op.block():
hidden_n = fluid.layers.fc(input=hidden1, size=64, act='relu')
layers.array_write(hidden_n, i, data_arr)
fluid.layers.increment(x=counter, value=1, in_place=True)
layers.less_than(x=counter, y=until, cond=cond)
hidden_n = layers.array_read(data_arr, i)
hidden2 = fluid.layers.fc(input=hidden_n, size=64, act='relu')
predict = fluid.layers.fc(input=hidden2, size=10, act='softmax')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
cost = fluid.layers.cross_entropy(input=predict, label=label)
......
......@@ -121,27 +121,34 @@ class Timeline(object):
def _allocate_pids(self):
for event in self._profile_pb.events:
if event.device_id not in self._devices:
if event.type == profiler_pb2.Event.CPU:
if (event.device_id, "CPU") not in self._devices:
pid = self._allocate_pid()
self._devices[event.device_id] = pid
if event.device_id >= 0:
self._chrome_trace.emit_pid("gpu:%s:stream:%d" %
(pid, event.stream_id), pid)
elif event.device_id == -1:
self._chrome_trace.emit_pid("cpu:thread_hash:%d" %
event.stream_id, pid)
self._devices[(event.device_id, "CPU")] = pid
self._chrome_trace.emit_pid("cpu:block:%d" %
(event.device_id), pid)
elif event.type == profiler_pb2.Event.GPUKernel:
if (event.device_id, "GPUKernel") not in self._devices:
pid = self._allocate_pid()
self._devices[(event.device_id, "GPUKernel")] = pid
self._chrome_trace.emit_pid("gpu:%d" % (event.device_id),
pid)
def _allocate_events(self):
for event in self._profile_pb.events:
pid = self._devices[event.device_id]
if event.type == profiler_pb2.Event.CPU:
type = "CPU"
elif event.type == profiler_pb2.Event.GPUKernel:
type = "GPUKernel"
pid = self._devices[(event.device_id, type)]
args = {'name': event.name}
if event.memcopy.bytes > 0:
args = {'mem_bytes': event.memcopy.bytes}
# TODO(panyx0718): Chrome tracing only handles ms. However, some
# ops takes micro-seconds. Hence, we keep the ns here.
self._chrome_trace.emit_region(event.start_ns,
(event.end_ns - event.start_ns) /
1.0, pid, 0, 'Op', event.name, args)
self._chrome_trace.emit_region(
event.start_ns, (event.end_ns - event.start_ns) / 1.0, pid,
event.sub_device_id, 'Op', event.name, args)
def generate_chrome_trace(self):
self._allocate_pids()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册