未验证 提交 188bcbb7 编写于 作者: T tangwei12 提交者: GitHub

loglevel adjustment for distributed training (#31236)

Change-Id: I6210ce9c60bed48f3323c47b16500302b66cedf2
上级 b8a593e7
...@@ -508,7 +508,7 @@ void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope, ...@@ -508,7 +508,7 @@ void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope,
if (name.find("batch_sum") != std::string::npos) { if (name.find("batch_sum") != std::string::npos) {
Variable* var = scope->FindVar(name); Variable* var = scope->FindVar(name);
CHECK(var != nullptr) << "var[" << name << "] not found"; CHECK(var != nullptr) << "var[" << name << "] not found";
VLOG(0) << "prepare shrink dense batch_sum"; VLOG(3) << "prepare shrink dense batch_sum";
LoDTensor* tensor = var->GetMutable<LoDTensor>(); LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* g = tensor->data<float>(); float* g = tensor->data<float>();
......
...@@ -76,16 +76,13 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) { ...@@ -76,16 +76,13 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) {
} }
} }
VLOG(0) << "BrpcPsServer::start registe_ps_server";
_environment->registe_ps_server(ip, port, _rank); _environment->registe_ps_server(ip, port, _rank);
VLOG(0) << "BrpcPsServer::start wait";
cv_.wait(lock, [&] { return stoped_; }); cv_.wait(lock, [&] { return stoped_; });
PSHost host; PSHost host;
host.ip = ip; host.ip = ip;
host.port = port; host.port = port;
host.rank = _rank; host.rank = _rank;
VLOG(0) << "BrpcPsServer::start return host.rank";
return host.rank; return host.rank;
} }
...@@ -461,7 +458,7 @@ int32_t BrpcPsService::save_one_table(Table *table, ...@@ -461,7 +458,7 @@ int32_t BrpcPsService::save_one_table(Table *table,
int32_t feasign_size = 0; int32_t feasign_size = 0;
VLOG(0) << "save one table " << request.params(0) << " " << request.params(1); VLOG(3) << "save table " << request.params(0) << " " << request.params(1);
feasign_size = table->save(request.params(0), request.params(1)); feasign_size = table->save(request.params(0), request.params(1));
if (feasign_size < 0) { if (feasign_size < 0) {
set_response_code(response, -1, "table save failed"); set_response_code(response, -1, "table save failed");
...@@ -504,7 +501,7 @@ int32_t BrpcPsService::shrink_table(Table *table, ...@@ -504,7 +501,7 @@ int32_t BrpcPsService::shrink_table(Table *table,
set_response_code(response, -1, "table shrink failed"); set_response_code(response, -1, "table shrink failed");
return -1; return -1;
} }
VLOG(0) << "Pserver Shrink Finished"; VLOG(3) << "Pserver Shrink Finished";
return 0; return 0;
} }
......
...@@ -52,7 +52,7 @@ inline double GetCurrentUS() { ...@@ -52,7 +52,7 @@ inline double GetCurrentUS() {
Communicator::Communicator() {} Communicator::Communicator() {}
void Communicator::init_gflag(const std::string &gflags) { void Communicator::init_gflag(const std::string &gflags) {
VLOG(0) << "Init With Gflags:" << gflags; VLOG(3) << "Init With Gflags:" << gflags;
std::vector<std::string> flags = paddle::string::split_string(gflags); std::vector<std::string> flags = paddle::string::split_string(gflags);
if (flags.size() < 1) { if (flags.size() < 1) {
flags.push_back("-max_body_size=314217728"); flags.push_back("-max_body_size=314217728");
......
...@@ -194,10 +194,10 @@ class Communicator { ...@@ -194,10 +194,10 @@ class Communicator {
Communicator(); Communicator();
explicit Communicator(const std::map<std::string, std::string> &envs_) { explicit Communicator(const std::map<std::string, std::string> &envs_) {
VLOG(0) << "Communicator Init Envs"; VLOG(3) << "Communicator Init Envs";
for (auto &iter : envs_) { for (auto &iter : envs_) {
envs[iter.first] = iter.second; envs[iter.first] = iter.second;
VLOG(0) << iter.first << ": " << iter.second; VLOG(3) << iter.first << ": " << iter.second;
} }
barrier_table_id_ = std::stoi(envs.at("barrier_table_id")); barrier_table_id_ = std::stoi(envs.at("barrier_table_id"));
trainer_id_ = std::stoi(envs.at("trainer_id")); trainer_id_ = std::stoi(envs.at("trainer_id"));
...@@ -431,7 +431,7 @@ class HalfAsyncCommunicator : public AsyncCommunicator { ...@@ -431,7 +431,7 @@ class HalfAsyncCommunicator : public AsyncCommunicator {
need_global_step_ = need_global_step_ =
static_cast<bool>(std::stoi(envs.at("need_global_step"))); static_cast<bool>(std::stoi(envs.at("need_global_step")));
VLOG(0) << "HalfAsyncCommunicator Initialized"; VLOG(1) << "HalfAsyncCommunicator Initialized";
} }
void MainThread() override; void MainThread() override;
...@@ -476,7 +476,7 @@ class SyncCommunicator : public HalfAsyncCommunicator { ...@@ -476,7 +476,7 @@ class SyncCommunicator : public HalfAsyncCommunicator {
need_global_step_ = need_global_step_ =
static_cast<bool>(std::stoi(envs.at("need_global_step"))); static_cast<bool>(std::stoi(envs.at("need_global_step")));
VLOG(0) << "SyncCommunicator Initialized"; VLOG(1) << "SyncCommunicator Initialized";
} }
void BarrierSend(); void BarrierSend();
...@@ -520,7 +520,7 @@ class GeoCommunicator : public AsyncCommunicator { ...@@ -520,7 +520,7 @@ class GeoCommunicator : public AsyncCommunicator {
// id_queue's size // id_queue's size
max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num")); max_merge_var_num_ = std::stoi(envs.at("communicator_max_merge_var_num"));
send_queue_size_ = max_merge_var_num_; send_queue_size_ = max_merge_var_num_;
VLOG(0) << "GeoCommunicator Initialized"; VLOG(1) << "GeoCommunicator Initialized";
} }
void Send(const std::vector<std::string> &var_names, void Send(const std::vector<std::string> &var_names,
......
...@@ -42,7 +42,7 @@ void HeterClient::MainThread() { ...@@ -42,7 +42,7 @@ void HeterClient::MainThread() {
void HeterClient::Stop() { void HeterClient::Stop() {
running_ = false; running_ = false;
if (!is_initialized_) { if (!is_initialized_) {
VLOG(0) << "HeterClient is not inited, do nothing"; VLOG(3) << "HeterClient is not inited, do nothing";
} else { } else {
if (main_thread_) { if (main_thread_) {
auto status = StopHeterWorker(); auto status = StopHeterWorker();
...@@ -50,20 +50,20 @@ void HeterClient::Stop() { ...@@ -50,20 +50,20 @@ void HeterClient::Stop() {
main_thread_->join(); main_thread_->join();
main_thread_.reset(nullptr); main_thread_.reset(nullptr);
} }
VLOG(1) << "HeterClient Stop Done"; VLOG(3) << "HeterClient Stop Done";
} }
} }
void HeterClient::FinalizeWorker() { void HeterClient::FinalizeWorker() {
running_ = false; running_ = false;
if (!is_initialized_) { if (!is_initialized_) {
VLOG(0) << "HeterClient is not inited, do nothing"; VLOG(3) << "HeterClient is not inited, do nothing";
} else { } else {
if (main_thread_) { if (main_thread_) {
main_thread_->join(); main_thread_->join();
main_thread_.reset(nullptr); main_thread_.reset(nullptr);
} }
VLOG(1) << "HeterClient Stop Done"; VLOG(3) << "HeterClient Stop Done";
} }
} }
......
...@@ -95,7 +95,7 @@ int32_t HeterService::stop_heter_worker(const PsRequestMessage& request, ...@@ -95,7 +95,7 @@ int32_t HeterService::stop_heter_worker(const PsRequestMessage& request,
stop_cpu_worker_set_.insert(client_id); stop_cpu_worker_set_.insert(client_id);
if (stop_cpu_worker_set_.size() == fan_in_) { if (stop_cpu_worker_set_.size() == fan_in_) {
is_exit_ = true; is_exit_ = true;
VLOG(0) << "Stop heter Service done."; VLOG(3) << "Stop heter Service done.";
} }
return 0; return 0;
} }
......
...@@ -136,7 +136,7 @@ class HeterServer { ...@@ -136,7 +136,7 @@ class HeterServer {
virtual ~HeterServer() {} virtual ~HeterServer() {}
void Stop() { void Stop() {
VLOG(0) << "HeterServer Stop()"; VLOG(3) << "HeterServer Stop()";
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
stoped_ = true; stoped_ = true;
cv_.notify_all(); cv_.notify_all();
......
...@@ -91,7 +91,7 @@ int32_t CommonDenseTable::initialize_optimizer() { ...@@ -91,7 +91,7 @@ int32_t CommonDenseTable::initialize_optimizer() {
} else { } else {
VLOG(0) << "init optimizer failed"; VLOG(0) << "init optimizer failed";
} }
VLOG(0) << "init optimizer " << name << " done"; VLOG(3) << "init optimizer " << name << " done";
return 0; return 0;
} }
......
...@@ -45,15 +45,12 @@ class CommonDenseTable : public DenseTable { ...@@ -45,15 +45,12 @@ class CommonDenseTable : public DenseTable {
virtual int32_t set_global_lr(float* lr) override; virtual int32_t set_global_lr(float* lr) override;
int32_t load(const std::string& path, const std::string& param) override { int32_t load(const std::string& path, const std::string& param) override {
VLOG(0) << "Dense table may load by " VLOG(0) << "WARNING: dense variables will load on No.0 trainer";
"paddle.distributed.fleet.init_server";
return 0; return 0;
} }
int32_t save(const std::string& path, const std::string& param) override { int32_t save(const std::string& path, const std::string& param) override {
VLOG(0) VLOG(0) << "WARNING: dense variables will save on No.0 trainer";
<< "Dense table may be saved by "
"paddle.distributed.fleet.save_persistables/save_inference_model";
return 0; return 0;
} }
......
...@@ -166,7 +166,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath, ...@@ -166,7 +166,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
auto id = std::stoull(values[0]); auto id = std::stoull(values[0]);
if (id % pserver_num != pserver_id) { if (id % pserver_num != pserver_id) {
VLOG(0) << "will not load " << values[0] << " from " << valuepath VLOG(3) << "will not load " << values[0] << " from " << valuepath
<< ", please check id distribution"; << ", please check id distribution";
continue; continue;
} }
...@@ -259,7 +259,7 @@ int32_t CommonSparseTable::initialize_value() { ...@@ -259,7 +259,7 @@ int32_t CommonSparseTable::initialize_value() {
} }
} }
VLOG(0) << "has " << feasigns.size() << " ids need to be pre inited"; VLOG(3) << "has " << feasigns.size() << " ids need to be pre inited";
auto buckets = bucket(feasigns.size(), 10); auto buckets = bucket(feasigns.size(), 10);
for (int x = 0; x < 10; ++x) { for (int x = 0; x < 10; ++x) {
...@@ -291,10 +291,10 @@ int32_t CommonSparseTable::initialize_optimizer() { ...@@ -291,10 +291,10 @@ int32_t CommonSparseTable::initialize_optimizer() {
optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_, optimizer_ = std::make_shared<SSUM>(value_names_, value_dims_,
value_offsets_, value_idx_); value_offsets_, value_idx_);
} else { } else {
VLOG(0) << "init optimizer failed"; VLOG(3) << "init optimizer failed";
} }
VLOG(0) << "init optimizer " << name << " done"; VLOG(3) << "init optimizer " << name << " done";
return 0; return 0;
} }
...@@ -307,7 +307,7 @@ int32_t CommonSparseTable::set_global_lr(float* lr) { ...@@ -307,7 +307,7 @@ int32_t CommonSparseTable::set_global_lr(float* lr) {
int32_t CommonSparseTable::load(const std::string& path, int32_t CommonSparseTable::load(const std::string& path,
const std::string& param) { const std::string& param) {
rwlock_->WRLock(); rwlock_->WRLock();
VLOG(0) << "sparse table load with " << path << " with meta " << param; VLOG(3) << "sparse table load with " << path << " with meta " << param;
LoadFromText(path, param, _shard_idx, _shard_num, task_pool_size_, LoadFromText(path, param, _shard_idx, _shard_num, task_pool_size_,
&shard_values_); &shard_values_);
rwlock_->UNLock(); rwlock_->UNLock();
...@@ -318,7 +318,7 @@ int32_t CommonSparseTable::save(const std::string& dirname, ...@@ -318,7 +318,7 @@ int32_t CommonSparseTable::save(const std::string& dirname,
const std::string& param) { const std::string& param) {
rwlock_->WRLock(); rwlock_->WRLock();
int mode = std::stoi(param); int mode = std::stoi(param);
VLOG(0) << "sparse table save: " << dirname << " mode: " << mode; VLOG(3) << "sparse table save: " << dirname << " mode: " << mode;
auto varname = _config.common().table_name(); auto varname = _config.common().table_name();
std::string var_store = std::string var_store =
...@@ -534,11 +534,11 @@ int32_t CommonSparseTable::flush() { return 0; } ...@@ -534,11 +534,11 @@ int32_t CommonSparseTable::flush() { return 0; }
int32_t CommonSparseTable::shrink(const std::string& param) { int32_t CommonSparseTable::shrink(const std::string& param) {
rwlock_->WRLock(); rwlock_->WRLock();
int threshold = std::stoi(param); int threshold = std::stoi(param);
VLOG(0) << "sparse table shrink: " << threshold; VLOG(3) << "sparse table shrink: " << threshold;
for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) { for (int shard_id = 0; shard_id < task_pool_size_; ++shard_id) {
// shrink // shrink
VLOG(0) << shard_id << " " << task_pool_size_ << " begin shrink"; VLOG(4) << shard_id << " " << task_pool_size_ << " begin shrink";
shard_values_[shard_id]->Shrink(threshold); shard_values_[shard_id]->Shrink(threshold);
} }
rwlock_->UNLock(); rwlock_->UNLock();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册