未验证 提交 eec6ef81 编写于 作者: C chenjian 提交者: GitHub

Update record interface using part1 (#39693)

* fix RecordEvent interface

* modify default level to 4

* update interface use

* add const default trace level

* update record event interface using

* update operator.cc

* update part1

* fix include profiler.h header in ps server

* fix include profiler.h header in ps server
上级 77625d7d
......@@ -188,7 +188,8 @@ void BrpcPsService::service(google::protobuf::RpcController *cntl_base,
int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_dense");
platform::RecordEvent record_event(
"PsService->pull_dense", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
if (request.params_size() < 1) {
set_response_code(
......@@ -219,7 +220,9 @@ int32_t BrpcPsService::push_dense_param(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_dense_param");
platform::RecordEvent record_event("PsService->push_dense_param",
platform::TracerEventType::Communication,
1);
CHECK_TABLE_EXIST(table, request, response)
thread_local std::string push_buffer;
auto &req_io_buffer = cntl->request_attachment();
......@@ -245,7 +248,8 @@ int32_t BrpcPsService::push_dense_param(Table *table,
int32_t BrpcPsService::push_dense(Table *table, const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_dense");
platform::RecordEvent record_event(
"PsService->push_dense", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
auto req_buffer_size = request.data().size();
if (req_buffer_size < 1) {
......@@ -291,7 +295,9 @@ int32_t BrpcPsService::push_sparse_param(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_sparse_param");
platform::RecordEvent record_event("PsService->push_sparse_param",
platform::TracerEventType::Communication,
1);
CHECK_TABLE_EXIST(table, request, response)
auto &push_data = request.data();
if (push_data.size() < 1) {
......@@ -323,7 +329,8 @@ int32_t BrpcPsService::pull_geo_param(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_geo_param");
platform::RecordEvent record_event(
"PsService->pull_geo_param", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
thread_local std::string push_sparse_request_buffer;
......@@ -346,7 +353,8 @@ int32_t BrpcPsService::pull_sparse(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->pull_sparse");
platform::RecordEvent record_event(
"PsService->pull_sparse", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
auto &req_io_buffer = cntl->request_attachment();
......@@ -392,7 +400,8 @@ int32_t BrpcPsService::push_sparse(Table *table,
const PsRequestMessage &request,
PsResponseMessage &response,
brpc::Controller *cntl) {
platform::RecordEvent record_event("PsService->push_sparse");
platform::RecordEvent record_event(
"PsService->push_sparse", platform::TracerEventType::Communication, 1);
CHECK_TABLE_EXIST(table, request, response)
auto &push_data = request.data();
if (push_data.size() < 1) {
......
......@@ -113,7 +113,9 @@ int Communicator::SetClients(std::vector<uint64_t> &host_sign_list) {
void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,
int table_id, Scope *scope) {
platform::RecordEvent record_event("Communicator->RpcRecvDense");
platform::RecordEvent record_event("Communicator->RpcRecvDense",
platform::TracerEventType::Communication,
1);
std::vector<paddle::distributed::Region> regions;
regions.reserve(varnames.size());
for (auto &t : varnames) {
......@@ -169,7 +171,9 @@ void Communicator::RpcRecvDense(const std::vector<std::string> &varnames,
void Communicator::RpcSendDenseParam(const std::vector<std::string> &varnames,
int table_id, const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendDenseParam");
platform::RecordEvent record_event("Communicator->RpcSendDenseParam",
platform::TracerEventType::Communication,
1);
auto place = platform::CPUPlace();
std::vector<paddle::distributed::Region> regions;
for (auto &t : varnames) {
......@@ -206,7 +210,9 @@ void Communicator::RpcSendDenseParam(const std::vector<std::string> &varnames,
}
void Communicator::RpcSendDense(const CommContext &ctx, const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendDense");
platform::RecordEvent record_event("Communicator->RpcSendDense",
platform::TracerEventType::Communication,
1);
auto &var_names = ctx.origin_varnames;
auto &table_id = ctx.table_id;
auto dense_data = std::make_shared<std::vector<float>>();
......@@ -250,7 +256,9 @@ void Communicator::RpcSendDense(const CommContext &ctx, const Scope &scope) {
void Communicator::RpcSendSparseParam(const std::string &varname, int table_id,
const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendSparseParam");
platform::RecordEvent record_event("Communicator->RpcSendSparseParam",
platform::TracerEventType::Communication,
1);
size_t request_call_num = _worker_ptr->get_server_nums();
std::vector<float *> push_g_vec;
......@@ -287,7 +295,9 @@ void Communicator::RpcSendSparseParam(const std::string &varname, int table_id,
void Communicator::RpcSendSparse(const std::string &var_name, int table_id,
const Scope &scope) {
platform::RecordEvent record_event("Communicator->RpcSendSparse");
platform::RecordEvent record_event("Communicator->RpcSendSparse",
platform::TracerEventType::Communication,
1);
size_t request_call_num = _worker_ptr->get_server_nums();
std::vector<uint64_t> sparse_push_keys;
std::vector<float *> push_g_vec;
......@@ -338,7 +348,9 @@ void Communicator::RpcSendSparse(const std::string &var_name, int table_id,
void Communicator::RpcRecvSparse(const std::string &varname, int table_id,
Scope *scope) {
platform::RecordEvent record_event("Communicator->RpcRecvSparse");
platform::RecordEvent record_event("Communicator->RpcRecvSparse",
platform::TracerEventType::Communication,
1);
auto *send_var = scope->Var(varname);
auto *tensor = send_var->GetMutable<framework::LoDTensor>();
auto dim = tensor->dims()[1];
......@@ -406,7 +418,9 @@ void Communicator::SendGlobalStep(const CommContext &ctx, int batches,
if (batches == 0) {
return;
}
platform::RecordEvent record_event("Communicator->SendGlobalStep");
platform::RecordEvent record_event("Communicator->SendGlobalStep",
platform::TracerEventType::Communication,
1);
auto &table_id = ctx.table_id;
size_t request_call_num = _worker_ptr->get_server_nums();
......@@ -994,7 +1008,8 @@ void SyncCommunicator::BarrierRecv() {
void GeoCommunicator::Send(const std::vector<std::string> &var_names,
const framework::Scope &scope) {
platform::RecordEvent record_event("GeoCommunicator->Send");
platform::RecordEvent record_event(
"GeoCommunicator->Send", platform::TracerEventType::Communication, 1);
waiting_ = false;
auto before_send = GetCurrentUS();
auto table_name = var_names[0];
......@@ -1137,7 +1152,9 @@ void GeoCommunicator::InitDense(std::vector<std::string> &varnames,
}
void GeoCommunicator::SendDense(const CommContext &send_ctx) {
platform::RecordEvent record_event("GeoCommunicator->SendDense");
platform::RecordEvent record_event("GeoCommunicator->SendDense",
platform::TracerEventType::Communication,
1);
auto &var_names = send_ctx.origin_varnames;
auto &table_id = send_ctx.table_id;
for (auto &varname : var_names) {
......@@ -1177,7 +1194,9 @@ void GeoCommunicator::SendDense(const CommContext &send_ctx) {
}
void GeoCommunicator::RecvDense(const CommContext &send_ctx) {
platform::RecordEvent record_event("GeoCommunicator->RecvDense");
platform::RecordEvent record_event("GeoCommunicator->RecvDense",
platform::TracerEventType::Communication,
1);
auto &table_id = send_ctx.table_id;
auto &varnames = recv_varname_to_ctx_.at(table_id);
// 1. recv from pserver
......@@ -1235,7 +1254,9 @@ void GeoCommunicator::InitSparse(const std::string &var_name, int table_id) {
std::vector<int64_t> GeoCommunicator::MergeSparseIds(
const std::string &send_varname) {
platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds");
platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds",
platform::TracerEventType::Communication,
1);
size_t merge_num = 0, wait_times = 0;
std::unordered_set<int64_t> sparse_ids;
while (merge_num < static_cast<size_t>(max_merge_var_num_)) {
......@@ -1267,7 +1288,9 @@ std::vector<int64_t> GeoCommunicator::MergeSparseIds(
void GeoCommunicator::SendSparse(const std::string &varname,
std::vector<int64_t> &sparse_ids, int table_id,
int ep_idx) {
platform::RecordEvent record_event("GeoCommunicator->SendSparse");
platform::RecordEvent record_event("GeoCommunicator->SendSparse",
platform::TracerEventType::Communication,
1);
if (sparse_ids.size() == 0) {
return;
}
......@@ -1342,7 +1365,9 @@ void GeoCommunicator::SendSparse(const std::string &varname,
void GeoCommunicator::RecvSparse(const std::string &varname, int table_id,
int ep_idx) {
platform::RecordEvent record_event("GeoCommunicator->RecvSparse");
platform::RecordEvent record_event("GeoCommunicator->RecvSparse",
platform::TracerEventType::Communication,
1);
// 1. recv from pserver
std::vector<uint64_t> keys;
std::vector<float> values;
......
......@@ -13,7 +13,6 @@
// limitations under the License.
#include "paddle/fluid/distributed/ps/service/heter_client.h"
#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/split.h"
......@@ -152,7 +151,9 @@ void HeterClient::SendAndRecvAsync(
const std::string& message_name,
const std::vector<std::string>& send_var_name,
const std::vector<std::string>& recv_var_name, const std::string& mode) {
platform::RecordEvent record_event("HeterClient->SendAndRecvAsync");
platform::RecordEvent record_event("HeterClient->SendAndRecvAsync",
platform::TracerEventType::Communication,
1);
const platform::DeviceContext* p_ctx = &ctx;
const framework::Scope* p_scope = &scope;
const std::string message_name_val = message_name;
......
......@@ -213,7 +213,9 @@ class RequestSendAndRecvHandler final : public HeterRequestHandler {
int Handle(const MultiVarMsg* request, MultiVarMsg* response,
brpc::Controller* cntl) override {
platform::RecordEvent record_event("RequestSendAndRecvHandler->Handle");
platform::RecordEvent record_event("RequestSendAndRecvHandler->Handle",
platform::TracerEventType::Communication,
1);
FLAGS_eager_delete_tensor_gb = -1;
// get microID from request
......
......@@ -18,7 +18,7 @@
#include "iomanip"
#include "paddle/fluid/distributed/ps/table/table.h"
#include "paddle/fluid/framework/archive.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace distributed {
std::vector<std::string> GraphPyService::split(std::string& str,
......
......@@ -17,7 +17,7 @@
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
DECLARE_bool(sync_nccl_allreduce);
......@@ -68,8 +68,8 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
#endif
void AllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
WaitInputVarGenerated();
std::vector<VarHandleBase *> inputs = this->Inputs();
std::vector<VarHandleBase *> outputs = this->Outputs();
......
......@@ -23,7 +23,7 @@
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
#if defined(PADDLE_WITH_XPU)
namespace paddle {
......
......@@ -18,15 +18,15 @@
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
namespace details {
void BroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
if (places_.size() == 1) return;
// The input and output may have dummy vars.
......
......@@ -15,7 +15,7 @@
#include "paddle/fluid/framework/details/eager_deletion_op_handle.h"
#include "paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/platform/cuda_device_guard.h"
#endif
......@@ -128,7 +128,8 @@ void EagerDeletionOpHandle::RunImpl() {
CallOnce();
}
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::UserDefined, 2);
std::deque<std::shared_ptr<memory::Allocation>> garbages;
for (size_t i = 0; i < var_infos_.size(); ++i) {
auto *var_info = var_infos_[i];
......
......@@ -24,7 +24,7 @@
#include "paddle/fluid/framework/details/fetch_async_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
......@@ -65,7 +65,8 @@ FetchResultType FastThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors, bool return_merged) {
VLOG(3) << "enter FastThreadedSSAGraphExecutor Run";
std::unique_ptr<platform::RecordEvent> event(
new platform::RecordEvent("FastThreadedSSAGraphExecutorPrepare"));
new platform::RecordEvent("FastThreadedSSAGraphExecutorPrepare",
platform::TracerEventType::UserDefined, 2));
std::unique_ptr<std::unordered_map<OpHandleBase *, std::atomic<int>>>
op_deps = atomic_op_deps_.get();
PrepareAtomicOpDeps();
......
......@@ -18,6 +18,7 @@
#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
......@@ -190,7 +191,8 @@ void FetchAsyncOpHandle::FetchMergedLodTensor(
}
void FetchAsyncOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::Operator, 1);
WaitInputVarGenerated(true);
// get src vars
......
......@@ -16,7 +16,7 @@
#include <string>
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
......@@ -128,7 +128,8 @@ static void TransData(const framework::LoDTensor &src_item,
}
void FetchOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::Operator, 1);
WaitInputVarGenerated(platform::CPUPlace());
tensors_.resize(inputs_.size());
......
......@@ -18,7 +18,7 @@
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/device_memory_aligment.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
DEFINE_bool(skip_fused_all_reduce_check, false, "");
DECLARE_bool(allreduce_record_one_event);
......@@ -68,7 +68,8 @@ FusedAllReduceOpHandle::~FusedAllReduceOpHandle() {
}
void FusedAllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
VLOG(4) << this->DebugString();
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
......
......@@ -15,14 +15,15 @@
#include "paddle/fluid/framework/details/fused_broadcast_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
namespace details {
void FusedBroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
if (places_.size() == 1UL) return;
......
......@@ -19,7 +19,7 @@
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
PADDLE_DEFINE_EXPORTED_bool(
cpu_deterministic, false,
......@@ -46,7 +46,8 @@ void ReduceOpHandle::Wait(
}
void ReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
if (places_.size() == 1) return;
// the input and output may have dummy var.
......
......@@ -14,7 +14,7 @@
#include "paddle/fluid/framework/details/rpc_op_handle.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
......@@ -30,7 +30,8 @@ RPCOpHandle::RPCOpHandle(ir::Node *node, const framework::OpDesc &op_desc,
place_(place) {}
void RPCOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(
Name(), platform::TracerEventType::Communication, 1);
for (auto *in : inputs_) {
auto &p = static_cast<VarHandle *>(in)->place();
......
......@@ -16,7 +16,7 @@
#include <string>
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace pten {
class DenseTensor;
......@@ -88,7 +88,8 @@ std::string ScaleLossGradOpHandle::LossGradName() const {
}
void ScaleLossGradOpHandle::RunImpl() {
platform::RecordEvent record_event(Name());
platform::RecordEvent record_event(Name(),
platform::TracerEventType::UserDefined, 2);
RunOnVar(local_exec_scopes_[0]->FindVar(LossGradName()), true);
}
......
......@@ -35,7 +35,7 @@ limitations under the License. */
#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/float16.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
#include "pybind11/numpy.h"
#include "pybind11/pybind11.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册