diff --git a/paddle/fluid/operators/distributed/grpc/grpc_serde.cc b/paddle/fluid/operators/distributed/grpc/grpc_serde.cc index eddd89cf20c2eb91e88d666a6ffe4a045df7298b..5c78c88b0183d21552160248972b300843fd1f87 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_serde.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_serde.cc @@ -40,6 +40,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, const int trainer_id, const std::string& table_name) { platform::RecordRPCEvent record_event("serial"); + platform::RecordEvent record_event_grpc("grpc::SerializeToByteBuffer", + platform::EventRole::kInnerOp); VarMsg request; TensorPayload* payload = nullptr; @@ -147,6 +149,8 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, const framework::Scope* scope, framework::Variable** var, int* trainer_id) { platform::RecordRPCEvent record_event("deserial"); + platform::RecordEvent record_event_grpc("grpc::DeserializeFromByteBuffer", + platform::EventRole::kInnerOp); operators::distributed::GRPCVariableResponse resp(scope, &ctx); PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!"); *var = resp.GetVar(); diff --git a/paddle/fluid/operators/distributed/grpc/grpc_server.cc b/paddle/fluid/operators/distributed/grpc/grpc_server.cc index 5c0232a50a9066f782be5269b4041958748c2e23..b0c2ccff01c7764ed36d29926bce2dfe110c42de 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_server.cc @@ -103,6 +103,7 @@ class RequestSend final : public RequestBase { std::string GetReqName() override { return request_->Varname(); } void Process() override { + platform::PushEvent("RequestSend::Process", platform::EventRole::kInnerOp); std::string varname = GetReqName(); auto scope = request_->GetMutableLocalScope(); @@ -114,6 +115,7 @@ class RequestSend final : public RequestBase { framework::Variable* outvar = nullptr; request_handler_->Handle(varname, scope, invar, &outvar, trainer_id); Finish(reply_, &responder_); + platform::PopEvent("RequestSend::Process", platform::EventRole::kInnerOp); } protected: @@ -139,6 +141,7 @@ class RequestGet final : public RequestBase { std::string GetReqName() override { return request_.varname(); } void Process() override { + platform::PushEvent("RequestGet::Process", platform::EventRole::kInnerOp); // proc request. std::string varname = request_.varname(); std::string out_varname = request_.out_varname(); @@ -162,6 +165,7 @@ class RequestGet final : public RequestBase { } VLOG(1) << "after SerializeToByteBuffer"; Finish(reply_, &responder_); + platform::PopEvent("RequestGet::Process", platform::EventRole::kInnerOp); } protected: @@ -189,6 +193,8 @@ class RequestGetNoBarrier final : public RequestBase { std::string GetReqName() override { return request_.varname(); } void Process() override { + platform::PushEvent("RequestGetNoBarrier::Process", + platform::EventRole::kInnerOp); // proc request. std::string varname = request_.varname(); std::string out_varname = request_.out_varname(); @@ -208,6 +214,8 @@ class RequestGetNoBarrier final : public RequestBase { &reply_); } Finish(reply_, &responder_); + platform::PopEvent("RequestGetNoBarrier::Process", + platform::EventRole::kInnerOp); } protected: @@ -237,6 +245,8 @@ class RequestGetMonomerVariable final : public RequestBase { std::string GetReqName() override { return request_.varname(); } void Process() override { + platform::PushEvent("RequestGetMonomerVariable::Process", + platform::EventRole::kInnerOp); // proc request. std::string varname = request_.varname(); @@ -254,6 +264,8 @@ class RequestGetMonomerVariable final : public RequestBase { SerializeToByteBuffer(varname, outvar, *h.dev_ctx_, &reply_); } Finish(reply_, &responder_); + platform::PopEvent("RequestGetMonomerVariable::Process", + platform::EventRole::kInnerOp); } protected: @@ -284,6 +296,8 @@ class RequestGetMonomerBarrier final : public RequestBase { std::string GetReqName() override { return request_.varname(); } void Process() override { + platform::PushEvent("RequestGetMonomerBarrier::Process", + platform::EventRole::kInnerOp); // proc request. std::string varname = request_.varname(); VLOG(4) << "RequestGetMonomerBarrier " << varname; @@ -299,6 +313,8 @@ class RequestGetMonomerBarrier final : public RequestBase { request_.trainer_id()); Finish(reply_, &responder_); + platform::PopEvent("RequestGetMonomerBarrier::Process", + platform::EventRole::kInnerOp); } protected: @@ -330,6 +346,8 @@ class RequestPrefetch final : public RequestBase { std::string GetReqName() override { return request_->Varname(); } void Process() override { + platform::PushEvent("RequestPrefetch::Process", + platform::EventRole::kInnerOp); // prefetch process... std::string in_var_name = request_->Varname(); std::string out_var_name = request_->OutVarname(); @@ -350,6 +368,8 @@ class RequestPrefetch final : public RequestBase { SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); Finish(reply_, &responder_); + platform::PopEvent("RequestPrefetch::Process", + platform::EventRole::kInnerOp); } protected: @@ -379,6 +399,8 @@ class RequestCheckpointNotify final : public RequestBase { std::string GetReqName() override { return request_->Varname(); } void Process() override { + platform::PushEvent("RequestCheckpointNotify::Process", + platform::EventRole::kInnerOp); auto scope = request_->GetMutableLocalScope(); std::string checkpoint_notify = request_->Varname(); @@ -391,6 +413,8 @@ class RequestCheckpointNotify final : public RequestBase { request_handler_->Handle(checkpoint_notify, scope, nullptr, nullptr, trainer_id, checkpoint_dir); Finish(reply_, &responder_); + platform::PopEvent("RequestCheckpointNotify::Process", + platform::EventRole::kInnerOp); } protected: @@ -417,6 +441,8 @@ class RequestNotify final : public RequestBase { std::string GetReqName() override { return request_->Varname(); } void Process() override { + platform::PushEvent("RequestNotify::Process", + platform::EventRole::kInnerOp); std::string varname = GetReqName(); VLOG(4) << "RequestNotify var_name:" << varname; @@ -426,6 +452,7 @@ class RequestNotify final : public RequestBase { framework::Variable* outvar = nullptr; request_handler_->Handle(varname, scope, invar, &outvar, trainer_id); Finish(reply_, &responder_); + platform::PopEvent("RequestNotify::Process", platform::EventRole::kInnerOp); } protected: @@ -456,6 +483,8 @@ class RequestSendAndRecv final : public RequestBase { std::string GetReqName() override { return request_->Varname(); } void Process() override { + platform::PushEvent("RequestSendAndRecv::Process", + platform::EventRole::kInnerOp); std::string in_var_name = request_->Varname(); std::string out_var_name = request_->OutVarname(); std::string table_name = request_->TableName(); @@ -471,6 +500,8 @@ class RequestSendAndRecv final : public RequestBase { SerializeToByteBuffer(out_var_name, outvar, *request_handler_->dev_ctx(), &reply_); Finish(reply_, &responder_); + platform::PopEvent("RequestSendAndRecv::Process", + platform::EventRole::kInnerOp); } protected: diff --git a/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc b/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc index e46d2fbe01c16a8b0cbf402f6ffff4907adf7356..0c2edecb652478795796a8fb36763a21011ac511 100644 --- a/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc +++ b/paddle/fluid/operators/distributed/grpc/grpc_variable_response.cc @@ -26,6 +26,10 @@ namespace paddle { namespace operators { namespace distributed { +static std::mutex grpc_profile_mutex; +static bool grpc_profile_begin = false; +static bool grpc_profile_end = false; + enum WireType { WIRETYPE_VARINT = 0, WIRETYPE_LENGTH_DELIMITED = 2, @@ -283,13 +287,25 @@ int GRPCVariableResponse::Parse(Source* source) { } if (profiling == platform::kEnableProfiler && !platform::IsProfileEnabled()) { - platform::EnableProfiler(platform::ProfilerState::kCPU); + if (grpc_profile_mutex.try_lock()) { + if (!grpc_profile_begin && !grpc_profile_end) { + platform::EnableProfiler(platform::ProfilerState::kAll); + grpc_profile_begin = true; + grpc_profile_mutex.unlock(); + } + } } else if (profiling == platform::kDisableProfiler && platform::IsProfileEnabled()) { - platform::DisableProfiler( - platform::EventSortingKey::kDefault, - string::Sprintf("%s_%lld", FLAGS_rpc_server_profile_path, - listener_id)); + if (grpc_profile_mutex.try_lock()) { + if (grpc_profile_begin && !grpc_profile_end) { + platform::DisableProfiler(platform::EventSortingKey::kTotal, + string::Sprintf("./%s_%s_profile.log", + getenv("TRAINING_ROLE"), + getenv("PADDLE_PORT"))); + grpc_profile_end = true; + grpc_profile_mutex.unlock(); + } + } } break; } diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index a9378d61c3ca39bd43b558633cc4d04c40175cac..84828c7cd0c3b07ed0ac70c06f4b1821d75e301e 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -26,11 +26,11 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/tensor.h" - #include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/rpc_client.h" #include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/distributed_ops/send_recv_util.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -195,6 +195,8 @@ void prefetchs(const std::vector &id_var_names, const std::vector &endpoints, const framework::ExecutionContext &context, const framework::Scope &scope) { + platform::RecordEvent record_event("Distributed_lookup_table::prefetchs", + platform::EventRole::kInnerOp); auto vec_dim_1 = 0; auto vec_dim_0 = 0; framework::Variable *var = scope.FindVar(persistable_var_name); diff --git a/paddle/fluid/operators/distributed/parameter_recv.cc b/paddle/fluid/operators/distributed/parameter_recv.cc index 5409ec54987fbb7ad89f61cc1655a4c3ef302ac0..ce256cb8b23702b233565d4d22887f076e858f49 100644 --- a/paddle/fluid/operators/distributed/parameter_recv.cc +++ b/paddle/fluid/operators/distributed/parameter_recv.cc @@ -24,12 +24,12 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/tensor.h" - #include "paddle/fluid/operators/distributed/distributed.h" #include "paddle/fluid/operators/distributed/rpc_client.h" #include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/distributed_ops/send_recv_util.h" #include "paddle/fluid/operators/strided_memcpy.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -43,6 +43,8 @@ using DDim = framework::DDim; template void RecvSelectedRows(const CommContext &rpc_ctx, const framework::Scope &scope) { + platform::RecordEvent record_event("ParameterRecv::RecvSelectedRows", + platform::EventRole::kInnerOp); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto cpu_place = platform::CPUPlace(); auto &cpu_ctx = *pool.Get(cpu_place); @@ -112,6 +114,8 @@ void RecvSelectedRows(const CommContext &rpc_ctx, template void RecvLodTensor(const CommContext &rpc_ctx, const framework::Scope &scope) { + platform::RecordEvent record_event("ParameterRecv::RecvLodTensor", + platform::EventRole::kInnerOp); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto cpu_place = platform::CPUPlace(); auto &cpu_ctx = *pool.Get(cpu_place); diff --git a/paddle/fluid/operators/distributed/parameter_send.cc b/paddle/fluid/operators/distributed/parameter_send.cc index 545b1f5e803c60f8c68005849336e1d3e4893df7..e95420d4279dcd142b30051360ee3fe4fd41a716 100644 --- a/paddle/fluid/operators/distributed/parameter_send.cc +++ b/paddle/fluid/operators/distributed/parameter_send.cc @@ -28,6 +28,7 @@ #include "paddle/fluid/operators/distributed/rpc_client.h" #include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/distributed_ops/send_recv_util.h" +#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/string/printf.h" namespace paddle { @@ -97,6 +98,8 @@ template void ParameterSend::operator()(const CommContext &rpc_ctx, const framework::Scope &scope, bool sync, int multi_parts) { + platform::RecordEvent record_event("ParameterSend::operator", + platform::EventRole::kInnerOp); if (rpc_ctx.var_name == STEP_COUNTER) { SendByNotifyRPC(rpc_ctx, scope); return; @@ -114,6 +117,8 @@ void ParameterSend::operator()(const CommContext &rpc_ctx, auto *send_var = scope.FindVar(rpc_ctx.var_name); if (send_var->IsType()) { + platform::RecordEvent record_event("ParameterSend::LoDTensor", + platform::EventRole::kInnerOp); size_t out_num = rpc_ctx.splited_varnames.size(); if (out_num > 1) { auto &send_tensor = send_var->Get(); @@ -162,6 +167,8 @@ void ParameterSend::operator()(const CommContext &rpc_ctx, } } } else if (send_var->IsType()) { + platform::RecordEvent record_event("ParameterSend::SelectedRows", + platform::EventRole::kInnerOp); auto &send_slr = send_var->Get(); auto &send_rows = send_slr.rows(); diff --git a/paddle/fluid/operators/distributed/variable_response.cc b/paddle/fluid/operators/distributed/variable_response.cc index de77121ee3990366771723e3c43e53362c832ef7..b188ccf5228e0f0fe5cc54fc7619b3babc1f1aef 100644 --- a/paddle/fluid/operators/distributed/variable_response.cc +++ b/paddle/fluid/operators/distributed/variable_response.cc @@ -15,6 +15,7 @@ #include "paddle/fluid/operators/distributed/variable_response.h" #include #include "paddle/fluid/operators/distributed/sendrecvop_utils.h" +#include "paddle/fluid/platform/profiler.h" DEFINE_string(rpc_server_profile_path, "./profile_ps", "the profile log file path"); @@ -27,6 +28,8 @@ bool VariableResponse::ReadRaw(::google::protobuf::io::CodedInputStream* input, const platform::DeviceContext& dev_ctx, platform::Place place, void* dest, int64_t size) { + platform::RecordEvent record_event("VariableResponse::ReadRaw", + platform::EventRole::kInnerOp); const void* data = NULL; int size_to_write = 0; int64_t length = size; @@ -123,6 +126,8 @@ bool VariableResponse::CopyLodTensorData( ::google::protobuf::io::CodedInputStream* input, const platform::DeviceContext& ctx, const framework::DDim& dims, int length) { + platform::RecordEvent record_event("VariableResponse::CopyLodTensorData", + platform::EventRole::kInnerOp); auto server_var = GetVar(); if (!server_var) { LOG(ERROR) << "recved var should not on current server: " @@ -164,6 +169,9 @@ bool VariableResponse::CopySelectRowsTensorData( ::google::protobuf::io::CodedInputStream* input, const platform::DeviceContext& ctx, const framework::DDim& dims, int length) { + platform::RecordEvent record_event( + "VariableResponse::CopySelectRowsTensorData", + platform::EventRole::kInnerOp); auto* slr = GetVar()->GetMutable(); slr->set_height(meta_.slr_height()); auto* tensor = slr->mutable_value(); @@ -186,6 +194,8 @@ bool VariableResponse::CopySelectRowsTensorData( bool VariableResponse::CopySelectRowsData( ::google::protobuf::io::CodedInputStream* input, const platform::DeviceContext& ctx, int length) { + platform::RecordEvent record_event("VariableResponse::CopySelectRowsData", + platform::EventRole::kInnerOp); auto* slr = GetVar()->GetMutable(); slr->mutable_rows()->clear(); slr->mutable_rows()->resize(length / sizeof(int64_t)); // int64