未验证 提交 a83e4704 编写于 作者: D Dun 提交者: GitHub

Profiler refine and add CUDA runtime api tracer (#15301)

* refine profiler && add runtime tracer

* test=develop

* test=develop

* test=develop

* test=develop

* test=develop

* test=develop

* test=develop

* test=develop

* fix bug && test=develop

* add thread id map && test=develop

* test=develop

* testing

* bug fix

* remove cuda event && refine code && test=develop

* test=develop

* test=develop

* test=develop

* fix windows temp file && test=develop

* test=develop

* fix windows bug && test=develop

* fix start up issue && test=develop

* code polish &&  test=develop

* remove unused code && test=develop

* add some cupti cbid && test=develop

* add FLAGS_multiple_of_cupti_buffer_size && test=develop

* fix compile error && test=develop

* add keyword && test=develop

* fix && test=develop

* code polish && test=develop
上级 c0d33f13
......@@ -53,7 +53,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
#endif
void AllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
platform::RecordEvent record_event(Name());
WaitInputVarGenerated();
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
......
......@@ -22,7 +22,7 @@ namespace framework {
namespace details {
void BroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second);
platform::RecordEvent record_event(Name());
if (places_.size() == 1) return;
......
......@@ -22,7 +22,7 @@ namespace framework {
namespace details {
void FusedBroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second);
platform::RecordEvent record_event(Name());
if (places_.size() == 1UL) return;
......
......@@ -139,7 +139,7 @@ void ReduceOpHandle::GatherSelectedRows(
#endif
void ReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
platform::RecordEvent record_event(Name());
if (places_.size() == 1) return;
// the input and output may have dummy var.
......
......@@ -63,7 +63,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
eptr = std::current_exception();
}
platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun", nullptr);
platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun");
++drop_scope_counter_;
bool stream_end = false;
......
......@@ -37,7 +37,7 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
FeedFetchList ThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::unique_ptr<platform::RecordEvent> event(
new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr));
new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare"));
std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars;
auto ready_vars = std::make_shared<BlockingQueue<VarHandleBase *>>();
......
......@@ -177,9 +177,7 @@ void OperatorBase::Run(const Scope& scope, const platform::Place& place) {
// in concurrency scenerio. Here use an `if` to fix this issue.
// Please not remove the `if`, ask @Superjomn if there are any concern.
if (platform::IsProfileEnabled()) {
platform::DeviceContextPool& pool =
platform::DeviceContextPool::Instance();
platform::RecordEvent record_event(Type(), pool.Get(place));
platform::RecordEvent record_event(Type());
RunImpl(scope, place);
} else {
RunImpl(scope, place);
......
......@@ -171,9 +171,7 @@ void TestInference(const std::string& dirname,
// Enable the profiler
paddle::platform::EnableProfiler(state);
{
paddle::platform::RecordEvent record_event(
"init_program",
paddle::platform::DeviceContextPool::Instance().Get(place));
paddle::platform::RecordEvent record_event("init_program");
inference_program = InitProgram(&executor, scope, dirname, is_combined);
}
......@@ -230,9 +228,7 @@ void TestInference(const std::string& dirname,
// Run repeat times to profile the performance
for (int i = 0; i < repeat; ++i) {
paddle::platform::RecordEvent record_event(
"run_inference",
paddle::platform::DeviceContextPool::Instance().Get(place));
paddle::platform::RecordEvent record_event("run_inference");
if (PrepareContext) {
// Note: if you change the inference_program, you need to call
......
......@@ -80,7 +80,7 @@ VarHandlePtr BRPCClient::AsyncSendVar(const std::string& ep,
google::protobuf::Closure* done = brpc::NewCallback(
&HandleSendResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
ch_ctx->stub->SendVariable(cntl, &request, response, done);
......@@ -184,7 +184,7 @@ VarHandlePtr BRPCClient::_AsyncGetVar(const std::string& ep,
google::protobuf::Closure* done = brpc::NewCallback(
&HandleGetResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
if (method_name == kGetMonomerRPC) {
ch_ctx->stub->GetMonomerVariable(cntl, &req, response, done);
......@@ -272,7 +272,7 @@ VarHandlePtr BRPCClient::AsyncPrefetchVar(const std::string& ep,
&cntl->request_attachment(), out_var_name_val,
false, 0, table_name_val);
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
google::protobuf::Closure* done = brpc::NewCallback(
&HandleGetResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
......@@ -311,7 +311,7 @@ VarHandlePtr BRPCClient::AsyncSendFetchBarrier(const std::string& ep,
VarHandlePtr var_h(
new VarHandle(ep, method, FETCH_BARRIER_MESSAGE, nullptr, nullptr));
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
google::protobuf::Closure* done = brpc::NewCallback(
&HandleFetchBarrierResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
......@@ -406,7 +406,7 @@ VarHandlePtr BRPCClient::AsyncSendVarMessage(
sendrecv::VoidMessage* response = new sendrecv::VoidMessage();
cntl->set_timeout_ms(time_out);
platform::RecordRPCEvent record_event(method_name, nullptr);
platform::RecordRPCEvent record_event(method_name);
VarHandlePtr var_h(
new VarHandle(ep, method_name, req.varname(), nullptr, nullptr));
......
......@@ -89,7 +89,7 @@ VarHandlePtr GRPCClient::AsyncSendVar(const std::string& ep,
// stub context
s->response_call_back_ = nullptr;
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_);
......@@ -184,7 +184,7 @@ VarHandlePtr GRPCClient::_AsyncGetVar(
// stub context
s->response_call_back_ = ProcGetResponse;
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
auto call =
s->stub_g_.PrepareUnaryCall(s->context_.get(), rpc_path, buf, &cq_);
......@@ -235,7 +235,7 @@ VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep,
// stub context
s->response_call_back_ = ProcGetResponse;
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req,
......@@ -265,7 +265,7 @@ VarHandlePtr GRPCClient::AsyncSendBatchBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(BATCH_BARRIER_MESSAGE);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -290,7 +290,7 @@ VarHandlePtr GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(FETCH_BARRIER_MESSAGE);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -317,7 +317,7 @@ VarHandlePtr GRPCClient::AsyncGetMonomerBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(var_name);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncGetMonomerBarrier(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -342,7 +342,7 @@ VarHandlePtr GRPCClient::AsyncSendComplete(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(COMPLETE_MESSAGE);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -372,7 +372,7 @@ VarHandlePtr GRPCClient::AsyncCheckpointNotify(const std::string& ep,
req.set_varname(CHECKPOINT_SAVE_MESSAGE);
req.set_out_varname(dir);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncCheckpointNotify(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......
......@@ -38,7 +38,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
::grpc::ByteBuffer* msg, const std::string& out_name,
const int trainer_id,
const std::string& table_name) {
platform::RecordRPCEvent record_event("serial", &ctx);
platform::RecordRPCEvent record_event("serial");
VarMsg request;
TensorPayload* payload = nullptr;
......@@ -147,7 +147,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
const platform::DeviceContext& ctx,
const framework::Scope* scope,
framework::Variable** var, int* trainer_id) {
platform::RecordRPCEvent record_event("deserial", &ctx);
platform::RecordRPCEvent record_event("deserial");
operators::distributed::GRPCVariableResponse resp(scope, &ctx);
PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
*var = resp.GetVar();
......
......@@ -85,9 +85,7 @@ class ReadOp : public framework::OperatorBase {
std::vector<framework::LoDTensor> ins;
// For profiling
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(dev_place);
platform::RecordEvent record_event(Type(), &ctx);
platform::RecordEvent record_event(Type());
reader->ReadNext(&ins);
if (ins.empty()) {
......
......@@ -88,7 +88,11 @@ cc_library(timer SRCS timer.cc)
cc_test(timer_test SRCS timer_test.cc DEPS timer)
cc_library(device_tracer SRCS device_tracer.cc DEPS boost profiler_proto framework_proto ${GPU_CTX_DEPS})
cc_library(profiler SRCS profiler.cc DEPS device_context device_tracer)
if(WITH_GPU)
nv_library(profiler SRCS profiler.cc profiler.cu DEPS device_context device_tracer)
else()
cc_library(profiler SRCS profiler.cc DEPS device_context device_tracer)
endif()
cc_test(profiler_test SRCS profiler_test.cc DEPS profiler)
nv_test(float16_gpu_test SRCS float16_test.cu DEPS lod_tensor)
......
......@@ -32,6 +32,8 @@ inline uint64_t PosixInNsec() {
return 1000 * (static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec);
}
class Event;
// DeviceTracer performs the following tasks:
// 1. Register cuda callbacks for various events: kernel, memcpy, etc.
// 2. Collect cuda statistics: start/end ts, memory, etc.
......@@ -68,11 +70,13 @@ class DeviceTracer {
virtual void Enable() = 0;
// Needs to be called once after use.
virtual void Disable() = 0;
// Needs to be called once before reuse.
virtual void Reset() = 0;
// Add a pair to correlate internal cuda id with high level
// annotation (string). So cuda statistics can be represented by
// annotation event(with string). So cuda statistics can be represented by
// human-readable annotations.
virtual void AddAnnotation(uint64_t id, const std::string& anno) = 0;
virtual void AddAnnotation(uint32_t id, Event* event) = 0;
virtual void AddMemRecords(const std::string& name, uint64_t start_ns,
uint64_t end_ns, int64_t device_id,
......@@ -92,6 +96,9 @@ class DeviceTracer {
// Generate a proto after done (Disabled).
virtual proto::Profile GenProfile(const std::string& profile_path) = 0;
// generate kernel elapsed time into Event
virtual void GenEventKernelCudaElapsedTime() = 0;
virtual bool IsEnabled() = 0;
};
......@@ -99,14 +106,19 @@ class DeviceTracer {
DeviceTracer* GetDeviceTracer();
// Set a name for the cuda kernel operation being launched by the thread.
void SetCurAnnotation(const std::string& anno);
void SetCurAnnotation(Event* event);
// Clear the name after the operation is done.
void ClearCurAnnotation();
// Current name of the operation being run in the thread.
std::string CurAnnotation();
std::string CurAnnotationName();
Event* CurAnnotation();
void SetCurBlock(int block_id);
void ClearCurBlock();
int BlockDepth();
// Set current thread id, so we can map the system thread id to thread id.
void RecoreCurThreadId(int32_t id);
int32_t GetThreadIdFromSystemThreadId(uint32_t id);
} // namespace platform
} // namespace paddle
......@@ -22,6 +22,7 @@ limitations under the License. */
#include "paddle/fluid/string/split.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/dynload/cupti.h"
#endif
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/init.h"
......@@ -30,6 +31,9 @@ limitations under the License. */
DEFINE_int32(paddle_num_threads, 1,
"Number of threads for each paddle instance.");
DEFINE_int32(multiple_of_cupti_buffer_size, 1,
"Multiple of the CUPTI device buffer size. If the timestamps have "
"been dropped when you are profiling, try increasing this value.");
namespace paddle {
namespace framework {
......@@ -78,7 +82,32 @@ void InitP2P(std::vector<int> devices) {
#endif
}
void InitCupti() {
#ifdef PADDLE_WITH_CUPTI
if (FLAGS_multiple_of_cupti_buffer_size == 1) return;
size_t attrValue = 0, attrValueSize = sizeof(size_t);
#define MULTIPLY_ATTR_VALUE(attr) \
{ \
PADDLE_ENFORCE(!platform::dynload::cuptiActivityGetAttribute( \
attr, &attrValueSize, &attrValue)); \
attrValue *= FLAGS_multiple_of_cupti_buffer_size; \
LOG(WARNING) << "Set " #attr " " << attrValue << " byte"; \
PADDLE_ENFORCE(!platform::dynload::cuptiActivitySetAttribute( \
attr, &attrValueSize, &attrValue)); \
}
MULTIPLY_ATTR_VALUE(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE);
MULTIPLY_ATTR_VALUE(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE_CDP);
#if CUDA_VERSION >= 9000
MULTIPLY_ATTR_VALUE(CUPTI_ACTIVITY_ATTR_PROFILING_SEMAPHORE_POOL_SIZE);
#endif
#undef MULTIPLY_ATTR_VALUE
#endif
}
void InitDevices(bool init_p2p) {
// CUPTI attribute should be set before any CUDA context is created (see CUPTI
// documentation about CUpti_ActivityAttribute).
InitCupti();
/*Init all available devices by default */
std::vector<int> devices;
#ifdef PADDLE_WITH_CUDA
......
......@@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
#include <algorithm>
#include <iomanip>
#include <limits>
......@@ -27,7 +29,6 @@ limitations under the License. */
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/platform/device_tracer.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/printf.h"
DEFINE_bool(enable_rpc_profiler, false, "Enable rpc profiler or not.");
......@@ -66,12 +67,13 @@ struct EventList {
((kEventSize + kEventAlign - 1) / kEventAlign * kEventAlign);
template <typename... Args>
void Record(Args&&... args) {
Event* Record(Args&&... args) {
if (event_blocks.empty() || event_blocks.front().size() == kNumBlock) {
event_blocks.emplace_front();
event_blocks.front().reserve(kNumBlock);
}
event_blocks.front().emplace_back(std::forward<Args>(args)...);
return &event_blocks.front().back();
}
std::vector<Event> Reduce() {
......@@ -98,21 +100,8 @@ inline uint64_t GetTimeInNsec() {
.count();
}
Event::Event(EventType type, std::string name, uint32_t thread_id,
const DeviceContext* dev_ctx)
: type_(type), name_(name), thread_id_(thread_id), has_cuda_(false) {
#ifdef PADDLE_WITH_CUDA
has_cuda_ = dev_ctx ? platform::is_gpu_place(dev_ctx->GetPlace()) : false;
if (has_cuda_) {
auto* cuda_dev_ctx = static_cast<const CUDADeviceContext*>(dev_ctx);
PADDLE_ENFORCE(cudaSetDevice(
boost::get<platform::CUDAPlace>(cuda_dev_ctx->GetPlace()).device));
PADDLE_ENFORCE(cudaGetDevice(&device_));
PADDLE_ENFORCE(cudaEventCreate(&event_));
auto stream = cuda_dev_ctx->stream();
PADDLE_ENFORCE(cudaEventRecord(event_, stream));
}
#endif
Event::Event(EventType type, std::string name, uint32_t thread_id)
: type_(type), name_(name), thread_id_(thread_id) {
cpu_ns_ = GetTimeInNsec();
}
......@@ -124,88 +113,70 @@ double Event::CpuElapsedMs(const Event& e) const {
double Event::CudaElapsedMs(const Event& e) const {
#ifdef PADDLE_WITH_CUDA
if (!has_cuda_) return 0.0;
PADDLE_ENFORCE(e.has_cuda() && has_cuda());
PADDLE_ENFORCE(e.device() == device());
PADDLE_ENFORCE(cudaEventSynchronize(event_));
PADDLE_ENFORCE(cudaEventSynchronize(e.event()));
float ms;
PADDLE_ENFORCE(cudaEventElapsedTime(&ms, event_, e.event()));
return ms;
#ifdef PADDLE_WITH_CUPTI
return gpu_ns_ / 1000000.0;
#endif
#else
PADDLE_THROW("CUDA is not enabled");
#endif
}
#ifdef PADDLE_WITH_CUDA
static void ForEachDevice(std::function<void(int)> func) {
auto original_device = GetCurrentDeviceId();
int count = GetCUDADeviceCount();
for (int i = 0; i < count; i++) {
SetDeviceId(i);
func(i);
}
SetDeviceId(original_device);
}
#endif
inline EventList& GetEventList() {
if (!g_event_list) {
std::lock_guard<std::mutex> guard(g_all_event_lists_mutex);
g_event_list = std::make_shared<EventList>();
g_thread_id = g_next_thread_id++;
g_all_event_lists.emplace_front(g_event_list);
RecoreCurThreadId(g_thread_id);
}
return *g_event_list;
}
void Mark(const std::string& name, const DeviceContext* dev_ctx) {
GetEventList().Record(EventType::kMark, name, g_thread_id, dev_ctx);
void Mark(const std::string& name) {
GetEventList().Record(EventType::kMark, name, g_thread_id);
}
void PushEvent(const std::string& name, const DeviceContext* dev_ctx) {
GetEventList().Record(EventType::kPushRange, name, g_thread_id, dev_ctx);
Event* PushEvent(const std::string& name) {
return GetEventList().Record(EventType::kPushRange, name, g_thread_id);
}
void PopEvent(const std::string& name, const DeviceContext* dev_ctx) {
GetEventList().Record(EventType::kPopRange, name, g_thread_id, dev_ctx);
void PopEvent(const std::string& name) {
GetEventList().Record(EventType::kPopRange, name, g_thread_id);
}
RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
RecordEvent::RecordEvent(const std::string& name)
: is_enabled_(false), start_ns_(PosixInNsec()) {
if (g_state == ProfilerState::kDisabled) return;
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
is_enabled_ = true;
dev_ctx_ = dev_ctx;
name_ = name;
PushEvent(name_, dev_ctx_);
Event* e = PushEvent(name_);
// Maybe need the same push/pop behavior.
SetCurAnnotation(name_);
SetCurAnnotation(e);
}
RecordEvent::~RecordEvent() {
if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(),
tracer->AddCPURecords(CurAnnotationName(), start_ns_, PosixInNsec(),
BlockDepth(), g_thread_id);
}
ClearCurAnnotation();
PopEvent(name_, dev_ctx_);
PopEvent(name_);
}
RecordRPCEvent::RecordRPCEvent(const std::string& name,
const DeviceContext* dev_ctx) {
RecordRPCEvent::RecordRPCEvent(const std::string& name) {
if (FLAGS_enable_rpc_profiler) {
event_.reset(new platform::RecordEvent(name, dev_ctx));
event_.reset(new platform::RecordEvent(name));
}
}
RecordBlock::RecordBlock(int block_id)
: is_enabled_(false), start_ns_(PosixInNsec()) {
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
if (g_state == ProfilerState::kDisabled) return;
is_enabled_ = true;
SetCurBlock(block_id);
......@@ -213,7 +184,7 @@ RecordBlock::RecordBlock(int block_id)
}
RecordBlock::~RecordBlock() {
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
......@@ -225,11 +196,21 @@ RecordBlock::~RecordBlock() {
ClearCurBlock();
}
void SynchronizeAllDevice() {
#ifdef PADDLE_WITH_CUDA
int count = GetCUDADeviceCount();
for (int i = 0; i < count; i++) {
SetDeviceId(i);
PADDLE_ENFORCE(cudaDeviceSynchronize());
}
#endif
}
void EnableProfiler(ProfilerState state) {
PADDLE_ENFORCE(state != ProfilerState::kDisabled,
"Can't enable profiling, since the input state is ",
"ProfilerState::kDisabled");
SynchronizeAllDevice();
std::lock_guard<std::mutex> l(profiler_mu);
if (state == g_state) {
return;
......@@ -238,23 +219,20 @@ void EnableProfiler(ProfilerState state) {
should_send_profile_state = true;
GetDeviceTracer()->Enable();
#ifdef PADDLE_WITH_CUDA
if (g_state == ProfilerState::kCUDA) {
if (g_state == ProfilerState::kCUDA || g_state == ProfilerState::kAll ||
g_state == ProfilerState::kCPU) {
// Generate some dummy events first to reduce the startup overhead.
for (int i = 0; i < 5; i++) {
ForEachDevice([](int d) {
DeviceContext* dev_ctx = new CUDADeviceContext(CUDAPlace(d));
Mark("_cuda_startup_", dev_ctx);
dev_ctx->Wait();
delete dev_ctx;
});
}
DummyKernelAndEvent();
GetDeviceTracer()->Reset();
}
#endif
// Mark the profiling start.
Mark("_start_profiler_", nullptr);
Mark("_start_profiler_");
}
void ResetProfiler() {
SynchronizeAllDevice();
GetDeviceTracer()->Reset();
std::lock_guard<std::mutex> guard(g_all_event_lists_mutex);
for (auto it = g_all_event_lists.begin(); it != g_all_event_lists.end();
++it) {
......@@ -481,20 +459,23 @@ void ParseEvents(const std::vector<std::vector<Event>>& events,
void DisableProfiler(EventSortingKey sorted_key,
const std::string& profile_path) {
SynchronizeAllDevice();
std::lock_guard<std::mutex> l(profiler_mu);
if (g_state == ProfilerState::kDisabled) return;
// Mark the profiling stop.
Mark("_stop_profiler_", nullptr);
Mark("_stop_profiler_");
std::vector<std::vector<Event>> all_events = GetAllEvents();
ParseEvents(all_events, true, sorted_key);
ParseEvents(all_events, false, sorted_key);
ResetProfiler();
DeviceTracer* tracer = GetDeviceTracer();
if (tracer->IsEnabled()) {
tracer->Disable();
tracer->GenProfile(profile_path);
tracer->GenEventKernelCudaElapsedTime();
}
std::vector<std::vector<Event>> all_events = GetAllEvents();
ParseEvents(all_events, true, sorted_key);
ParseEvents(all_events, false, sorted_key);
ResetProfiler();
g_state = ProfilerState::kDisabled;
should_send_profile_state = true;
}
......
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
#include <cuda.h>
namespace paddle {
namespace platform {
__global__ void DummyKernel(int *a) { a[0] = 0; }
static void ForEachDevice(std::function<void(int)> func) {
auto original_device = GetCurrentDeviceId();
int count = GetCUDADeviceCount();
for (int i = 0; i < count; i++) {
SetDeviceId(i);
func(i);
}
SetDeviceId(original_device);
}
void DummyKernelAndEvent() {
for (int i = 0; i < 5; i++) {
ForEachDevice([](int d) {
CUDADeviceContext *dev_ctx = new CUDADeviceContext(CUDAPlace(d));
Mark("_cuda_startup_");
int *ptr;
PADDLE_ENFORCE(cudaMalloc(&ptr, sizeof(int)));
DummyKernel<<<1, 1, 0, dev_ctx->stream()>>>(ptr);
dev_ctx->Wait();
PADDLE_ENFORCE(cudaFree(ptr));
delete dev_ctx;
});
}
}
} // namespace platform
} // namespace paddle
......@@ -28,17 +28,17 @@ class Event {
public:
// The DeviceContext is used to get the cuda stream.
// If CPU profiling mode, can pass nullptr.
Event(EventType type, std::string name, uint32_t thread_id,
const DeviceContext* dev_ctx);
Event(EventType type, std::string name, uint32_t thread_id);
const EventType& type() const;
std::string name() const { return name_; }
uint32_t thread_id() const { return thread_id_; }
bool has_cuda() const { return has_cuda_; }
#ifdef PADDLE_WITH_CUDA
#ifndef PADDLE_WITH_CUPTI
cudaEvent_t event() const { return event_; }
int device() const { return device_; }
#endif
#endif
double CpuElapsedMs(const Event& e) const;
......@@ -49,11 +49,21 @@ class Event {
std::string name_;
uint32_t thread_id_;
int64_t cpu_ns_;
bool has_cuda_;
#ifdef PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_CUPTI
int64_t gpu_ns_ = 0;
public:
void AddCudaElapsedTime(int64_t start_ns, int64_t end_ns) {
gpu_ns_ += end_ns - start_ns;
}
private:
#else
cudaEvent_t event_ = nullptr;
int device_ = -1;
#endif
#endif
};
enum ProfilerState {
......@@ -63,22 +73,19 @@ enum ProfilerState {
kAll, // Profile both CPU and GPU. (Currently experimental).
};
void Mark(const std::string& name, const DeviceContext* dev_ctx);
void Mark(const std::string& name);
void PushEvent(const std::string& name, const DeviceContext* dev_ctx);
Event* PushEvent(const std::string& name);
void PopEvent(const std::string& name, const DeviceContext* dev_ctx);
void PopEvent(const std::string& name);
struct RecordEvent {
// dev_ctx can be set to nullptr if device is cpu.
RecordEvent(const std::string& name, const DeviceContext* dev_ctx);
explicit RecordEvent(const std::string& name);
~RecordEvent();
bool is_enabled_;
uint64_t start_ns_;
// The device context is used by Event to get the current cuda stream.
const DeviceContext* dev_ctx_;
// Event name
std::string name_;
// Need to distinguish name by op type, block_id, program_id and perhaps
......@@ -88,8 +95,7 @@ struct RecordEvent {
class RecordRPCEvent {
public:
// dev_ctx can be set to nullptr if device is cpu.
RecordRPCEvent(const std::string& name, const DeviceContext* dev_ctx);
explicit RecordRPCEvent(const std::string& name);
~RecordRPCEvent() {}
private:
......@@ -132,5 +138,9 @@ bool ShouldSendProfileState();
void SetProfileListener();
int64_t ListenerId();
#ifdef PADDLE_WITH_CUDA
void DummyKernelAndEvent();
#endif
} // namespace platform
} // namespace paddle
......@@ -31,6 +31,7 @@ message Event {
optional int64 sub_device_id = 6;
optional MemCopy memcopy = 7;
optional string detail_info = 9;
}
message Profile {
......
......@@ -23,76 +23,49 @@ TEST(Event, CpuElapsedTime) {
using paddle::platform::Event;
using paddle::platform::EventType;
Event start_event(EventType::kPushRange, "test", 0, nullptr);
EXPECT_TRUE(start_event.has_cuda() == false);
Event start_event(EventType::kPushRange, "test", 0);
int counter = 0;
while (counter != 1000) {
counter++;
}
Event stop_event(EventType::kPopRange, "test", 0, nullptr);
Event stop_event(EventType::kPopRange, "test", 0);
EXPECT_GT(start_event.CpuElapsedMs(stop_event), 0);
}
#ifdef PADDLE_WITH_CUDA
TEST(Event, CudaElapsedTime) {
using paddle::platform::DeviceContext;
using paddle::platform::CUDADeviceContext;
using paddle::platform::CUDAPlace;
using paddle::platform::Event;
using paddle::platform::EventType;
DeviceContext* dev_ctx = new CUDADeviceContext(CUDAPlace(0));
Event start_event(EventType::kPushRange, "test", 0, dev_ctx);
EXPECT_TRUE(start_event.has_cuda() == true);
int counter = 0;
while (counter != 1000) {
counter++;
}
Event stop_event(EventType::kPopRange, "test", 0, dev_ctx);
EXPECT_GT(start_event.CudaElapsedMs(stop_event), 0);
}
#endif
TEST(RecordEvent, RecordEvent) {
using paddle::platform::DeviceContext;
using paddle::platform::Event;
using paddle::platform::EventType;
using paddle::platform::RecordEvent;
using paddle::platform::PushEvent;
using paddle::platform::PopEvent;
using paddle::platform::ProfilerState;
using paddle::platform::EventSortingKey;
ProfilerState state = ProfilerState::kCPU;
DeviceContext* dev_ctx = nullptr;
#ifdef PADDLE_WITH_CUDA
using paddle::platform::CUDADeviceContext;
using paddle::platform::CUDAPlace;
state = ProfilerState::kCUDA;
dev_ctx =
new paddle::platform::CUDADeviceContext(paddle::platform::CUDAPlace(0));
#endif
EnableProfiler(state);
/* Usage 1:
* PushEvent(evt_name, dev_ctx);
* PushEvent(evt_name);
* ...
* code to be analyzed
* ...
* PopEvent(evt_name, dev_ctx);
* PopEvent(evt_name);
*/
LOG(INFO) << "Usage 1: PushEvent & PopEvent";
for (int loop = 0; loop < 3; ++loop) {
for (int i = 1; i < 5; ++i) {
std::string name = "op_" + std::to_string(i);
PushEvent(name, dev_ctx);
PushEvent(name);
int counter = 1;
while (counter != i * 1000) counter++;
PopEvent(name, dev_ctx);
PopEvent(name);
}
}
/* Usage 2:
* {
* RecordEvent record_event(name, dev_ctx);
* RecordEvent record_event(name);
* ...
* code to be analyzed
* ...
......@@ -101,7 +74,7 @@ TEST(RecordEvent, RecordEvent) {
LOG(INFO) << "Usage 2: RecordEvent";
for (int i = 1; i < 5; ++i) {
std::string name = "evs_op_" + std::to_string(i);
RecordEvent record_event(name, dev_ctx);
RecordEvent record_event(name);
int counter = 1;
while (counter != i * 1000) counter++;
}
......@@ -123,20 +96,20 @@ TEST(RecordEvent, RecordEvent) {
LOG(INFO) << "Usage 3: nested RecordEvent";
for (int i = 1; i < 5; ++i) {
std::string name = "ano_evs_op_" + std::to_string(i);
RecordEvent record_event(name, dev_ctx);
RecordEvent record_event(name);
int counter = 1;
while (counter != i * 100) counter++;
{
std::string nested_name = "nested_ano_evs_op_" + std::to_string(i);
RecordEvent nested_record_event(nested_name, dev_ctx);
RecordEvent nested_record_event(nested_name);
int nested_counter = 1;
while (nested_counter != i * 100) nested_counter++;
}
}
// Bad Usage:
PushEvent("event_without_pop", dev_ctx);
PopEvent("event_without_push", dev_ctx);
PushEvent("event_without_pop");
PopEvent("event_without_push");
std::vector<std::vector<Event>> events = paddle::platform::GetAllEvents();
int cuda_startup_count = 0;
......
......@@ -131,7 +131,8 @@ def __bootstrap__():
'eager_delete_tensor_gb', 'fast_eager_deletion_mode',
'allocator_strategy', 'reader_queue_speed_test_mode',
'print_sub_graph_dir', 'pe_profile_fname', 'warpctc_dir',
'inner_op_parallelism', 'enable_parallel_graph'
'inner_op_parallelism', 'enable_parallel_graph',
'multiple_of_cupti_buffer_size'
]
if 'Darwin' not in sysstr:
read_env_flags.append('use_pinned_memory')
......
......@@ -16,15 +16,19 @@ from __future__ import print_function
import unittest
import os
import tempfile
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.layers as layers
import paddle.fluid.core as core
import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2
class TestProfiler(unittest.TestCase):
def net_profiler(self, state, profile_path='/tmp/profile'):
def net_profiler(self, state, use_parallel_executor=False):
profile_path = os.path.join(tempfile.gettempdir(), "profile")
open(profile_path, "w").write("")
startup_program = fluid.Program()
main_program = fluid.Program()
......@@ -60,6 +64,11 @@ class TestProfiler(unittest.TestCase):
place = fluid.CPUPlace() if state == 'CPU' else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(startup_program)
if use_parallel_executor:
pe = fluid.ParallelExecutor(
state != 'CPU',
loss_name=avg_cost.name,
main_program=main_program)
pass_acc_calculator = fluid.average.WeightedAverage()
with profiler.profiler(state, 'total', profile_path) as prof:
......@@ -69,6 +78,9 @@ class TestProfiler(unittest.TestCase):
x = np.random.random((32, 784)).astype("float32")
y = np.random.randint(0, 10, (32, 1)).astype("int64")
if use_parallel_executor:
pe.run(feed={'x': x, 'y': y}, fetch_list=[avg_cost.name])
continue
outs = exe.run(main_program,
feed={'x': x,
'y': y},
......@@ -77,21 +89,37 @@ class TestProfiler(unittest.TestCase):
b_size = np.array(outs[2])
pass_acc_calculator.add(value=acc, weight=b_size)
pass_acc = pass_acc_calculator.eval()
data = open(profile_path, 'rb').read()
self.assertGreater(len(data), 0)
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(data)
self.assertGreater(len(profile_pb.events), 0)
for event in profile_pb.events:
if event.type == profiler_pb2.Event.GPUKernel:
if not event.detail_info and not event.name.startswith("MEM"):
raise Exception(
"Kernel %s missing event. Has this kernel been recorded by RecordEvent?"
% event.name)
elif event.type == profiler_pb2.Event.CPU and (
event.name.startswith("Driver API") or
event.name.startswith("Runtime API")):
print("Warning: unregister", event.name)
def test_cpu_profiler(self):
self.net_profiler('CPU')
self.net_profiler('CPU', use_parallel_executor=True)
@unittest.skipIf(not core.is_compiled_with_cuda(),
"profiler is enabled only with GPU")
def test_cuda_profiler(self):
self.net_profiler('GPU')
self.net_profiler('GPU', use_parallel_executor=True)
@unittest.skipIf(not core.is_compiled_with_cuda(),
"profiler is enabled only with GPU")
def test_all_profiler(self):
self.net_profiler('All', '/tmp/profile_out')
with open('/tmp/profile_out', 'rb') as f:
self.assertGreater(len(f.read()), 0)
self.net_profiler('All')
self.net_profiler('All', use_parallel_executor=True)
if __name__ == '__main__':
......
......@@ -131,8 +131,12 @@ class Timeline(object):
if (k, event.device_id, "CPU") not in self._devices:
pid = self._allocate_pid()
self._devices[(k, event.device_id, "CPU")] = pid
self._chrome_trace.emit_pid("%s:cpu:block:%d" %
(k, event.device_id), pid)
# -1 device id represents CUDA api call
if event.device_id == -1:
self._chrome_trace.emit_pid("%s:cuda_api" % k, pid)
else:
self._chrome_trace.emit_pid(
"%s:cpu:block:%d" % (k, event.device_id), pid)
elif event.type == profiler_pb2.Event.GPUKernel:
if (k, event.device_id, "GPUKernel") not in self._devices:
pid = self._allocate_pid()
......@@ -150,7 +154,9 @@ class Timeline(object):
pid = self._devices[(k, event.device_id, type)]
args = {'name': event.name}
if event.memcopy.bytes > 0:
args = {'mem_bytes': event.memcopy.bytes}
args['mem_bytes'] = event.memcopy.bytes
if event.detail_info:
args['detail_info'] = event.detail_info
# TODO(panyx0718): Chrome tracing only handles ms. However, some
# ops takes micro-seconds. Hence, we keep the ns here.
self._chrome_trace.emit_region(
......@@ -173,7 +179,7 @@ if args.timeline_path:
profile_paths = profile_path.split(',')
profile_dict = dict()
if len(profile_paths) == 1:
with open(profile_path, 'r') as f:
with open(profile_path, 'rb') as f:
profile_s = f.read()
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
......@@ -181,7 +187,7 @@ if len(profile_paths) == 1:
else:
for profile_path in profile_paths:
k, v = profile_path.split('=')
with open(v, 'r') as f:
with open(v, 'rb') as f:
profile_s = f.read()
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册