提交 b3526fb4 编写于 作者: M malin10

tmp

上级 613df707
...@@ -458,7 +458,7 @@ void GeoCommunicator::Send(const std::vector<std::string> &var_names, ...@@ -458,7 +458,7 @@ void GeoCommunicator::Send(const std::vector<std::string> &var_names,
// var_tables.size(), 1, // var_tables.size(), 1,
// platform::errors::InvalidArgument("var_tables.size() == 1 is // platform::errors::InvalidArgument("var_tables.size() == 1 is
// permitted")); // permitted"));
auto before_send = GetCurrentUS();
std::shared_ptr<SparseIdsMap> ids_table = std::make_shared<SparseIdsMap>(); std::shared_ptr<SparseIdsMap> ids_table = std::make_shared<SparseIdsMap>();
for (size_t i = 0; i < var_tables.size(); i++) { for (size_t i = 0; i < var_tables.size(); i++) {
auto table_name = var_tables[i]; auto table_name = var_tables[i];
...@@ -494,7 +494,8 @@ void GeoCommunicator::Send(const std::vector<std::string> &var_names, ...@@ -494,7 +494,8 @@ void GeoCommunicator::Send(const std::vector<std::string> &var_names,
} }
} }
need_push_queue_->Push(ids_table); need_push_queue_->Push(ids_table);
VLOG(1) << "run send_op finish."; auto after_send = GetCurrentUS();
VLOG(0) << "run send_op finish. using " << (after_send - before_send);
} }
void GeoCommunicator::MainThread() { void GeoCommunicator::MainThread() {
...@@ -506,106 +507,98 @@ void GeoCommunicator::MainThread() { ...@@ -506,106 +507,98 @@ void GeoCommunicator::MainThread() {
} }
while (running_) { while (running_) {
// int meet = Meet(); std::vector<std::future<void>> tasks;
tasks.reserve(send_var_nums_);
// VLOG(1) << "async_meet: " << meet;
auto before_send_by_communicator = GetCurrentUS();
// SendGlobalStep(meet); size_t wait_times = 0;
auto before = GetCurrentUS();
SendByCommunicator(0); while (ids_send_vec_.size() < static_cast<size_t>(max_merge_var_num_)) {
auto after = GetCurrentUS(); VLOG(1) << "ids_send_vec_ Size: " << ids_send_vec_.size();
VLOG(0) << "finish one SendByCommunicator using " << (after - before); if (need_push_queue_->Size() > 0) {
} wait_times = 0;
VLOG(1) << "geo-communicator stopped, send thread exit"; ids_send_vec_.push_back(*(need_push_queue_->Pop()));
} VLOG(1) << "ids_send_vec_ pushed";
} else if (need_push_queue_->Size() == 0) {
void GeoCommunicator::SendByCommunicator(int batches) { VLOG(1) << "wait_times -> " << wait_times;
std::vector<std::future<void>> tasks; if (wait_times >= static_cast<size_t>(send_wait_times_)) {
tasks.reserve(send_var_nums_); break;
}
auto before_send_by_communicator = GetCurrentUS(); std::this_thread::sleep_for(std::chrono::milliseconds(10));
size_t wait_times = 0; wait_times++;
while (ids_send_vec_.size() < static_cast<size_t>(max_merge_var_num_)) { continue;
VLOG(1) << "ids_send_vec_ Size: " << ids_send_vec_.size();
if (need_push_queue_->Size() > 0) {
wait_times = 0;
ids_send_vec_.push_back(*(need_push_queue_->Pop()));
VLOG(1) << "ids_send_vec_ pushed";
} else if (need_push_queue_->Size() == 0) {
VLOG(1) << "wait_times -> " << wait_times;
if (wait_times >= static_cast<size_t>(send_wait_times_)) {
break;
} }
std::this_thread::sleep_for(std::chrono::milliseconds(10));
wait_times++;
continue;
} }
}
if (ids_send_vec_.size() >= static_cast<size_t>(max_merge_var_num_)) {
auto before_send_global_step = GetCurrentUS();
VLOG(0) << "finish ins_send_vec using time "
<< (before_send_global_step - before_send_by_communicator);
SendGlobalStep(max_merge_var_num_);
auto after_send_global_step = GetCurrentUS();
VLOG(0) << "finish send global_step using "
<< (after_send_global_step - before_send_global_step);
for (auto &iter : send_varname_to_ctx_) {
VLOG(1) << "debug " << iter.first;
auto &var_name = iter.first;
auto &send_ctx = iter.second;
int pserver_num = static_cast<int>(send_ctx.epmap.size());
if (send_ctx.is_sparse) { if (ids_send_vec_.size() >= static_cast<size_t>(max_merge_var_num_)) {
if (var_name == STEP_COUNTER) { auto before_send_global_step = GetCurrentUS();
continue; VLOG(0) << "finish ins_send_vec using time "
} << (before_send_global_step - before_send_by_communicator)
<< "; send_var_nums_ = " << send_var_nums_;
SendGlobalStep(max_merge_var_num_);
auto after_send_global_step = GetCurrentUS();
VLOG(0) << "finish send global_step using "
<< (after_send_global_step - before_send_global_step);
for (auto &iter : send_varname_to_ctx_) {
VLOG(1) << "debug " << iter.first;
auto &var_name = iter.first;
auto &send_ctx = iter.second;
int pserver_num = static_cast<int>(send_ctx.epmap.size());
if (send_ctx.is_sparse) {
if (var_name == STEP_COUNTER) {
continue;
}
for (int ep_idx = 0; ep_idx < pserver_num; ep_idx++) { for (int ep_idx = 0; ep_idx < pserver_num; ep_idx++) {
auto send_recv_task = [this, ep_idx, &var_name] { auto send_recv_task = [this, ep_idx, &var_name] {
auto before_send_sparse = GetCurrentUS(); auto before_send_sparse = GetCurrentUS();
if (var_name == STEP_COUNTER) {
return;
}
SendSparse(var_name, ep_idx);
auto after_send_sparse = GetCurrentUS();
RecvSparse(var_name, ep_idx);
auto after_recv_sparse = GetCurrentUS();
VLOG(0)
<< "send recv "
<< send_varname_to_ctx_.at(var_name).splited_varnames[ep_idx]
<< " finish, using "
<< (after_send_sparse - before_send_sparse) << " and "
<< (after_recv_sparse - after_send_sparse)
<< "; total = " << (after_recv_sparse - before_send_sparse);
};
tasks.emplace_back(
send_threadpool_->enqueue(std::move(send_recv_task)));
// tasks[tasks.size() - 1].wait();
}
} else {
auto send_recv_task = [this, &var_name, &send_ctx] {
return;
if (var_name == STEP_COUNTER) { if (var_name == STEP_COUNTER) {
return; return;
} }
SendSparse(var_name, ep_idx); VLOG(1) << "send dense " << var_name << " begin";
auto after_send_sparse = GetCurrentUS(); SendDense(var_name);
RecvSparse(var_name, ep_idx); VLOG(1) << "send dense " << var_name << " done";
auto after_recv_sparse = GetCurrentUS(); VLOG(1) << "recv dense " << var_name << " begin";
VLOG(0) RecvDense(var_name);
<< "send recv " VLOG(1) << "recv dense " << var_name << " done";
<< send_varname_to_ctx_.at(var_name).splited_varnames[ep_idx]
<< " finish, using " << (after_send_sparse - before_send_sparse)
<< " and " << (after_recv_sparse - after_send_sparse)
<< "; total = " << (after_recv_sparse - before_send_sparse);
}; };
tasks.emplace_back( tasks.emplace_back(
send_threadpool_->enqueue(std::move(send_recv_task))); send_threadpool_->enqueue(std::move(send_recv_task)));
// tasks[tasks.size() - 1].wait();
} }
} else {
auto send_recv_task = [this, &var_name, &send_ctx] {
return;
if (var_name == STEP_COUNTER) {
return;
}
VLOG(1) << "send dense " << var_name << " begin";
SendDense(var_name);
VLOG(1) << "send dense " << var_name << " done";
VLOG(1) << "recv dense " << var_name << " begin";
RecvDense(var_name);
VLOG(1) << "recv dense " << var_name << " done";
};
tasks.emplace_back(
send_threadpool_->enqueue(std::move(send_recv_task)));
} }
}
for (auto &task : tasks) { for (auto &task : tasks) {
task.wait(); task.wait();
} }
ids_send_vec_.clear(); ids_send_vec_.clear();
VLOG(1) << "Finish SendByCommunicator"; auto finish_one_comm = GetCurrentUS();
VLOG(0) << "Finish SendByCommunicator "
<< (finish_one_comm - after_send_global_step);
}
} }
} }
......
...@@ -424,7 +424,7 @@ class GeoCommunicator : public AsyncCommunicator { ...@@ -424,7 +424,7 @@ class GeoCommunicator : public AsyncCommunicator {
const std::vector<std::string> &var_tables, const std::vector<std::string> &var_tables,
const framework::Scope &scope) override; const framework::Scope &scope) override;
void SendByCommunicator(int batches) override; // void SendByCommunicator(int batches) override;
void SendSparse(const std::string &varname, int ep_idx); void SendSparse(const std::string &varname, int ep_idx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册