提交 c0890988 编写于 作者: Q qiaolongfei

add RPCServerProfiler, replace listen and serv optimizer

上级 29fac3c0
...@@ -41,6 +41,7 @@ bool RequestSendHandler::Handle(const std::string& varname, ...@@ -41,6 +41,7 @@ bool RequestSendHandler::Handle(const std::string& varname,
// Async // Async
if (!sync_mode_) { if (!sync_mode_) {
rpc_server_->Profiler().OneStep();
try { try {
executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(), executor_->RunPreparedContext((*grad_to_prepared_ctx_)[varname].get(),
scope); scope);
......
...@@ -18,11 +18,44 @@ ...@@ -18,11 +18,44 @@
#include <string> #include <string>
#include "paddle/fluid/operators/distributed/rpc_server.h" #include "paddle/fluid/operators/distributed/rpc_server.h"
#include "paddle/fluid/platform/profiler.h"
DEFINE_int32(rpc_server_profile_period, 0,
"the period of listen_and_serv to do profile");
DEFINE_string(rpc_server_profile_path, "/dev/null",
"the profile log file path");
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
RPCServerProfiler::RPCServerProfiler(int profile_period,
const std::string& profile_log_path)
: profile_period_(profile_period), profile_log_path_(profile_log_path) {
step_ = 0;
}
void RPCServerProfiler::OneStep() {
PADDLE_ENFORCE_LE(step_, profile_period_,
"step_ should not be larger then "
"profile_period_");
if (profile_period_ <= 0) {
return;
}
if (step_ == 0) {
auto pf_state = paddle::platform::ProfilerState::kCPU;
paddle::platform::EnableProfiler(pf_state);
}
if (step_ == profile_period_) {
paddle::platform::DisableProfiler(paddle::platform::EventSortingKey::kTotal,
profile_log_path_);
step_ = 0;
} else {
step_++;
}
}
void RPCServer::ShutDown() { void RPCServer::ShutDown() {
LOG(INFO) << "RPCServer ShutDown "; LOG(INFO) << "RPCServer ShutDown ";
ShutDownImpl(); ShutDownImpl();
......
...@@ -19,16 +19,33 @@ ...@@ -19,16 +19,33 @@
#include <thread> // NOLINT #include <thread> // NOLINT
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/operators/distributed/request_handler.h"
DECLARE_int32(rpc_server_profile_period);
DECLARE_string(rpc_server_profile_path);
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
class RPCServerProfiler {
public:
RPCServerProfiler(int profile_period, const std::string& profile_log_path);
void OneStep();
private:
const int profile_period_;
std::string profile_log_path_;
int step_;
};
class RPCServer { class RPCServer {
public: public:
explicit RPCServer(const std::string& address, int client_num) explicit RPCServer(const std::string& address, int client_num)
: cur_cond_(0), : cur_cond_(0),
profiler_(FLAGS_rpc_server_profile_period,
FLAGS_rpc_server_profile_path),
bind_address_(address), bind_address_(address),
exit_flag_(false), exit_flag_(false),
selected_port_(0), selected_port_(0),
...@@ -67,6 +84,7 @@ class RPCServer { ...@@ -67,6 +84,7 @@ class RPCServer {
void Complete(); void Complete();
void ResetBarrierCounter(); void ResetBarrierCounter();
RPCServerProfiler& Profiler() { return profiler_; }
protected: protected:
virtual void ShutDownImpl() = 0; virtual void ShutDownImpl() = 0;
...@@ -79,6 +97,7 @@ class RPCServer { ...@@ -79,6 +97,7 @@ class RPCServer {
std::unordered_map<std::string, int> rpc_cond_map_; std::unordered_map<std::string, int> rpc_cond_map_;
std::atomic<int> cur_cond_; std::atomic<int> cur_cond_;
std::condition_variable rpc_cond_; std::condition_variable rpc_cond_;
RPCServerProfiler profiler_;
protected: protected:
std::string bind_address_; std::string bind_address_;
......
...@@ -25,10 +25,6 @@ limitations under the License. */ ...@@ -25,10 +25,6 @@ limitations under the License. */
#include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/operators/distributed/request_handler_impl.h"
#include "paddle/fluid/operators/listen_and_serv_op.h" #include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/platform/profiler.h"
DEFINE_int32(listen_and_serv_profile_period, 0,
"the period of listen_and_serv to do profile");
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -108,6 +104,7 @@ void ListenAndServOp::RunSyncLoop( ...@@ -108,6 +104,7 @@ void ListenAndServOp::RunSyncLoop(
framework::Scope *recv_scope, framework::Scope *recv_scope,
const std::vector<int> &prefetch_block_id_list, const std::vector<int> &prefetch_block_id_list,
const int checkpoint_point_block_id) const { const int checkpoint_point_block_id) const {
VLOG(2) << "RunSyncLoop";
size_t num_blocks = program->Size(); size_t num_blocks = program->Size();
auto optimize_blocks = auto optimize_blocks =
Attr<std::vector<framework::BlockDesc *>>(kOptimizeBlocks); Attr<std::vector<framework::BlockDesc *>>(kOptimizeBlocks);
...@@ -128,17 +125,8 @@ void ListenAndServOp::RunSyncLoop( ...@@ -128,17 +125,8 @@ void ListenAndServOp::RunSyncLoop(
rpc_service_->ResetBarrierCounter(); rpc_service_->ResetBarrierCounter();
int32_t profile_step = 0;
while (true) { while (true) {
PADDLE_ENFORCE_LE(profile_step, FLAGS_listen_and_serv_profile_period, rpc_service_->Profiler().OneStep();
"profile_step should not be larger then "
"FLAGS_listen_and_serv_profile_period");
if (FLAGS_listen_and_serv_profile_period > 0) {
if (profile_step == 0) {
auto pf_state = paddle::platform::ProfilerState::kCPU;
paddle::platform::EnableProfiler(pf_state);
}
}
// Get from multiple trainers, we don't care about the order in which // Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient. // the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(distributed::kRequestSend); rpc_service_->SetCond(distributed::kRequestSend);
...@@ -180,21 +168,13 @@ void ListenAndServOp::RunSyncLoop( ...@@ -180,21 +168,13 @@ void ListenAndServOp::RunSyncLoop(
// reset received sparse vars to avoid reuse it in the next mini-batch // reset received sparse vars to avoid reuse it in the next mini-batch
dynamic_cast<distributed::RequestSendHandler *>(request_send_handler_.get()) dynamic_cast<distributed::RequestSendHandler *>(request_send_handler_.get())
->ResetSparseVarRecorder(); ->ResetSparseVarRecorder();
if (FLAGS_listen_and_serv_profile_period > 0) {
if (profile_step == FLAGS_listen_and_serv_profile_period) {
paddle::platform::DisableProfiler(
paddle::platform::EventSortingKey::kTotal, "/dev/null");
profile_step = 0;
} else {
profile_step++;
}
}
} // while(true) } // while(true)
} }
void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
framework::ProgramDesc *program, framework::ProgramDesc *program,
framework::Scope *recv_scope) const { framework::Scope *recv_scope) const {
VLOG(2) << "RunAsyncLoop";
// grad name to block id // grad name to block id
std::unordered_map<std::string, int32_t> grad_to_block_id; std::unordered_map<std::string, int32_t> grad_to_block_id;
std::unordered_map<int32_t, std::string> id_to_grad; std::unordered_map<int32_t, std::string> id_to_grad;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册