diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index 910c6bdafda6c68d59fcfa8fdcdce6b4256d74ed..97e7266c8e53ee216882784c92bc77c80e57edbc 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -528,9 +528,13 @@ void GeoCommunicator::SendByCommunicator(int batches) { tasks.reserve(send_varname_to_ctx_.size()); for (auto &iter : send_varname_to_ctx_) { + VLOG(1) << "debug " << iter.first; auto &var_name = iter.first; auto &send_ctx = iter.second; int pserver_num = static_cast(send_ctx.epmap.size()); + if (var_name == STEP_COUNTER) { + continue; + } auto &ids_queue = send_ids_to_queue_.at(var_name); splited_ids_vec_[var_name].clear(); @@ -549,7 +553,7 @@ void GeoCommunicator::SendByCommunicator(int batches) { }; tasks.emplace_back( send_threadpool_->enqueue(std::move(send_recv_task))); - tasks[tasks.size() - 1].wait(); + // tasks[tasks.size() - 1].wait(); } } else { auto send_recv_task = [this, &var_name, &send_ctx] { @@ -567,9 +571,10 @@ void GeoCommunicator::SendByCommunicator(int batches) { } } - // for (auto &task : tasks) { - // task.wait(); - // } + for (auto &task : tasks) { + task.wait(); + } + VLOG(1) << "Finish SendByCommunicator"; } void GeoCommunicator::SendSparse(const std::string &varname, int ep_idx) {