diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index d3b77a758cc76e42f4a985bafd9ac7a9562d1cd3..91e2417d0ccf164fde89fb8a1e3c7b3f44dcfa73 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -18,12 +18,15 @@ limitations under the License. */ #include // NOLINT #include // NOLINT +#include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/variable_helper.h" #include "paddle/fluid/operators/distributed/parameter_recv.h" #include "paddle/fluid/operators/distributed/parameter_send.h" +#include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" +#include "paddle/fluid/platform/device_context.h" DEFINE_bool(communicator_independent_recv_thread, true, "use an independent to recv vars from parameter server"); @@ -40,28 +43,54 @@ namespace paddle { namespace operators { namespace distributed { +template +using EigenVector = framework::EigenVector; + +inline double GetCurrentUS() { + struct timeval time; + gettimeofday(&time, NULL); + return 1e+6 * time.tv_sec + time.tv_usec; +} + static inline void MergeVars(const std::string &var_name, const std::vector> &vars, Scope *scope) { - VLOG(3) << "merge " << vars.size() << " vars " << var_name << " to 1"; PADDLE_ENFORCE(!vars.empty(), "should have value to merge!"); auto cpu_place = platform::CPUPlace(); auto &var0 = vars[0]; auto *out_var = scope->Var(var_name); if (var0->IsType()) { + VLOG(3) << "merge " << var_name << " LoDTensor" + << var0->Get().dims(); + + // init output tensor auto *out_t = out_var->GetMutable(); auto *out_ptr = out_t->mutable_data( var0->Get().dims(), cpu_place); auto numel = out_t->numel(); - for (auto i = 0; i < numel; ++i) { - out_ptr[i] = 0; - for (auto &var : vars) { - auto &var_t = var->Get(); - PADDLE_ENFORCE_EQ(var_t.numel(), numel, "should have the same dims"); - out_ptr[i] += var_t.data()[i]; - } + + // check the input dims + for (auto &var : vars) { + auto &var_t = var->Get(); + PADDLE_ENFORCE_EQ(var_t.numel(), numel, "should have the same dims"); + } + + // set output tensor to 0. + auto cpu_ctx = paddle::platform::CPUDeviceContext(); + math::SetConstant + constant_functor; + constant_functor(cpu_ctx, out_t, static_cast(0)); + + // sum all vars to out + auto result = EigenVector::Flatten(*out_t); + for (auto &var : vars) { + auto &in_t = var->Get(); + auto in = EigenVector::Flatten(in_t); + result.device(*cpu_ctx.eigen_device()) = result + in; } } else if (var0->IsType()) { + auto &slr0 = var0->Get(); auto *out_slr = out_var->GetMutable(); out_slr->mutable_rows()->clear(); out_slr->mutable_value()->mutable_data({{}}, cpu_place); @@ -74,6 +103,8 @@ static inline void MergeVars(const std::string &var_name, merge_add; auto dev_ctx = paddle::platform::CPUDeviceContext(); merge_add(dev_ctx, inputs, out_slr, false); + VLOG(3) << "merge " << var_name << " SelectedRows height: " << slr0.height() + << " dims: " << slr0.value().dims(); } else { PADDLE_THROW("unsupported var type!"); } @@ -123,12 +154,13 @@ void Communicator::SendThread() { std::vector> task_futures; task_futures.reserve(send_varname_to_ctx_.size()); VLOG(3) << "run send graph"; + auto before_run_send_graph = GetCurrentUS(); for (auto &iter : send_varname_to_queue_) { auto &var_name = iter.first; auto &var_queue = iter.second; if (var_queue->Size() > 0) { auto send_task = [this, &var_name, &var_queue] { - VLOG(3) << "merge var " << var_name << " and send"; + VLOG(3) << var_name << " merge and send"; std::vector> vars; size_t merged_var_num = 0; while (var_queue->Size() > 0 && @@ -136,12 +168,19 @@ void Communicator::SendThread() { vars.push_back(var_queue->Pop()); merged_var_num++; } + auto before_merge = GetCurrentUS(); MergeVars(var_name, vars, send_scope_.get()); + auto after_merge = GetCurrentUS(); + VLOG(3) << "merge " << var_name << " use time " + << after_merge - before_merge; auto send_functor = distributed::ParameterSend(); auto &ctx = send_varname_to_ctx_.at(var_name); if (!FLAGS_communicator_fake_rpc) { send_functor(ctx, *send_scope_, true); } + auto after_send = GetCurrentUS(); + VLOG(3) << "send " << var_name << " use time " + << after_send - after_merge; }; task_futures.emplace_back( send_threadpool_->enqueue(std::move(send_task))); @@ -152,7 +191,9 @@ void Communicator::SendThread() { for (auto &task_f : task_futures) { task_f.wait(); } - VLOG(3) << "run send graph done"; + auto after_run_send_graph = GetCurrentUS(); + VLOG(3) << "run send graph use time " + << after_run_send_graph - before_run_send_graph; if (!FLAGS_communicator_independent_recv_thread) { RecvAll(); } @@ -161,6 +202,7 @@ void Communicator::SendThread() { void Communicator::RecvAll() { VLOG(3) << "parallel run recv graph"; + auto before_send = GetCurrentUS(); std::vector> task_futures; task_futures.reserve(recv_varname_to_ctx_.size()); for (auto &iter : recv_varname_to_ctx_) { @@ -177,7 +219,8 @@ void Communicator::RecvAll() { for (auto &task : task_futures) { task.wait(); } - VLOG(3) << "run recv graph done"; + auto after_recv = GetCurrentUS(); + VLOG(3) << "run recv graph use time " << after_recv - before_send; } void Communicator::RecvThread() { @@ -191,17 +234,15 @@ void Communicator::RecvThread() { void Communicator::Send(const std::string &var_name, const framework::Scope &scope) { - if (!FLAGS_communicator_fake_rpc) { - VLOG(3) << "communicator send " << var_name; - // push var into send queue by var_name - auto *grad_var = scope.FindVar(var_name); - PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited"); - auto tmp_grad_var = std::make_shared(); - framework::CopyVariable(*grad_var, tmp_grad_var.get()); - auto &queue = send_varname_to_queue_.at(var_name); - VLOG(3) << "send " << var_name << " queue size " << queue->Size(); - queue->Push(tmp_grad_var); - } + VLOG(3) << "communicator send " << var_name; + // push var into send queue by var_name + auto *grad_var = scope.FindVar(var_name); + PADDLE_ENFORCE(grad_var->IsInitialized(), "grad var should be inited"); + auto tmp_grad_var = std::make_shared(); + framework::CopyVariable(*grad_var, tmp_grad_var.get()); + auto &queue = send_varname_to_queue_.at(var_name); + VLOG(3) << "send " << var_name << " queue size " << queue->Size(); + queue->Push(tmp_grad_var); } Communicator *Communicator::GetInstance() { return communicator_.get(); }