diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index a0ac82a6f4a432ee0f0427a90508c88a262799e3..92959cb22ed1b70afd490f03e495c896544a8528 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -13,13 +13,16 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/operators/distributed/communicator.h" + #include #include + #include #include // NOLINT #include #include // NOLINT #include + #include "paddle/fluid/framework/eigen.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/tensor_util.h" @@ -428,13 +431,20 @@ void GeoCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx, } else { auto &send_ctx = iter.second; + send_var_nums_ += send_ctx.splited_varnames.size(); if (!send_ctx.is_sparse) { continue; } - - send_ids_to_queue_[varname] = - std::make_shared>>( - send_queue_size_); + int pserver_num = static_cast(send_ctx.epmap.size()); + for (int ep_idx = 0; ep_idx < pserver_num; ep_idx++) { + sparse_id_queues_.insert( + std::pair>>>>( + send_ctx.splited_varnames[ep_idx], + std::make_shared< + BlockingQueue>>>( + send_queue_size_))); + } } } send_threadpool_.reset(new ::ThreadPool(thread_pool_size_)); @@ -457,95 +467,152 @@ void GeoCommunicator::Send(const std::vector &var_names, const framework::Scope &scope) { waiting_ = false; - PADDLE_ENFORCE_EQ( - var_tables.size(), 1, - platform::errors::InvalidArgument("var_tables.size() == 1 is permitted")); - - auto table_name = var_tables[0]; + auto before_send = GetCurrentUS(); + std::unordered_map> ids_table; - if (table_name == STEP_COUNTER) { - auto &queue = send_varname_to_queue_.at(table_name); - - auto tmp_var = std::make_shared(); - auto *tensor = tmp_var->GetMutable(); - tensor->Resize(framework::make_ddim({1})); - auto *out_d = tensor->mutable_data(platform::CPUPlace()); - out_d[0] = 1; - VLOG(3) << "send to " << table_name << " with queue size " << queue->Size(); - queue->Push(tmp_var); - } else { - auto &queue = send_ids_to_queue_.at(table_name); - PADDLE_ENFORCE_EQ(var_names.size(), 1, - platform::errors::InvalidArgument( - "var_names.size() == 1 is permitted")); + for (size_t i = 0; i < var_tables.size(); i++) { + auto table_name = var_tables[i]; + if (table_name == STEP_COUNTER) { + continue; + } else { + size_t splited_var_nums = + send_varname_to_ctx_[table_name].splited_varnames.size(); + + for (size_t j = 0; j < splited_var_nums; j++) { + if (ids_table.find( + send_varname_to_ctx_[table_name].splited_varnames[j]) == + ids_table.end()) { + ids_table.insert(std::pair>( + send_varname_to_ctx_[table_name].splited_varnames[j], + std::unordered_set())); + } + } - auto *var = scope.FindVar(var_names[0]); + auto *var = scope.FindVar(var_names[i]); + auto var_tensor = var->Get(); + int element_number = var_tensor.numel(); + const int64_t *var_mutable_data = var_tensor.data(); - PADDLE_ENFORCE_EQ( - var->IsInitialized(), true, - platform::errors::InvalidArgument("grad var should be inited")); - - if (!var->IsType()) { - PADDLE_THROW(platform::errors::InvalidArgument( - "Only LodTensor can be send in GeoCommunicator::Send")); + // insert ids which has not been record + for (int j = 0; j < element_number; j++) { + auto ep_idx = var_mutable_data[j] % splited_var_nums; + ids_table.at(send_varname_to_ctx_[table_name].splited_varnames[ep_idx]) + .insert(var_mutable_data[j]); + } } - - std::vector ids; - auto &rows = var->Get().rows(); - ids.assign(rows.begin(), rows.end()); - queue->Push(ids); } + auto before_push = GetCurrentUS(); + for (auto &iter : ids_table) { + auto &key = iter.first; + auto &sparse_ids_set = iter.second; + auto sparse_ids_vec = std::make_shared>(); + sparse_ids_vec->assign(sparse_ids_set.begin(), sparse_ids_set.end()); + sparse_id_queues_.at(key)->Push(sparse_ids_vec); + VLOG(3) << "push " << sparse_ids_vec->size() << " ids to " << key + << "'s queue"; + } + auto after_send = GetCurrentUS(); + VLOG(3) << "run send_op finish. using " << (before_push - before_send) << "; " + << (after_send - before_push); } -void GeoCommunicator::SendByCommunicator(int batches) { - std::vector> tasks; - tasks.reserve(send_varname_to_ctx_.size()); +void GeoCommunicator::MainThread() { + VLOG(3) << "MainThread start and wait"; - for (auto &iter : send_varname_to_ctx_) { - auto &var_name = iter.first; - auto &send_ctx = iter.second; + while (waiting_ && running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + VLOG(3) << "wait for running"; + } - auto send_task = [this, batches, &var_name, &send_ctx] { - if (var_name == STEP_COUNTER) { - return; - } + while (running_) { + std::vector> tasks; + tasks.reserve(send_var_nums_); + for (auto &iter : send_varname_to_ctx_) { + auto &var_name = iter.first; + auto &send_ctx = iter.second; + int pserver_num = static_cast(send_ctx.epmap.size()); if (send_ctx.is_sparse) { - SendSparse(var_name, batches); + for (int ep_idx = 0; ep_idx < pserver_num; ep_idx++) { + auto send_recv_task = [this, ep_idx, &var_name] { + auto before_send_sparse = GetCurrentUS(); + if (var_name == STEP_COUNTER) { + return; + } + auto send_varname = + send_varname_to_ctx_.at(var_name).splited_varnames[ep_idx]; + auto sparse_ids = MergeSparseIds(send_varname); + if (sparse_ids.size() == 0) { + return; + } + SendSparse(var_name, ep_idx, sparse_ids); + auto after_send_sparse = GetCurrentUS(); + RecvSparse(var_name, ep_idx); + auto after_recv_sparse = GetCurrentUS(); + VLOG(3) + << "send recv " + << send_varname_to_ctx_.at(var_name).splited_varnames[ep_idx] + << " finish, using " << (after_send_sparse - before_send_sparse) + << " and " << (after_recv_sparse - after_send_sparse) + << "; total = " << (after_recv_sparse - before_send_sparse); + }; + tasks.emplace_back( + send_threadpool_->enqueue(std::move(send_recv_task))); + } } else { - VLOG(1) << "send dense " << var_name << " begin"; - SendDense(var_name); - VLOG(1) << "send dense " << var_name << " done"; + auto send_recv_task = [this, &var_name, &send_ctx] { + if (var_name == STEP_COUNTER) { + return; + } + SendDense(var_name); + RecvDense(var_name); + }; + tasks.emplace_back( + send_threadpool_->enqueue(std::move(send_recv_task))); } - }; - tasks.emplace_back(send_threadpool_->enqueue(std::move(send_task))); - } - - for (auto &task : tasks) { - task.wait(); + } + for (auto &task : tasks) { + task.wait(); + } } } -void GeoCommunicator::SendSparse(const std::string &varname, int batches) { - std::vector ids; - auto &ids_queue = send_ids_to_queue_.at(varname); - - for (int i = 0; i < batches; ++i) { - auto pop_ids = ids_queue->Pop(); - std::copy(pop_ids.begin(), pop_ids.end(), back_inserter(ids)); - } - - auto size = ids.size(); - - std::set st(ids.begin(), ids.end()); - ids.assign(st.begin(), st.end()); - VLOG(1) << "SendSparse receive var: " << varname << " unset: " << size - << " set: " << ids.size(); - - if (ids.empty()) { - LOG(WARNING) << "WARNING: GEO has nothing to send, return directly "; - return; +std::vector GeoCommunicator::MergeSparseIds( + const std::string &send_varname) { + size_t merge_num = 0, wait_times = 0; + std::unordered_set sparse_ids; + while (merge_num < static_cast(max_merge_var_num_)) { + VLOG(3) << "Merge Number of " << send_varname << " = " << merge_num; + if (sparse_id_queues_.at(send_varname)->Size() > 0) { + wait_times = 0; + std::shared_ptr> pop_ids = + sparse_id_queues_.at(send_varname)->Pop(); + for (size_t j = 0; j < pop_ids->size(); j++) { + sparse_ids.insert(pop_ids->at(j)); + } + merge_num += 1; + VLOG(3) << "sparse_id_queues_(" << send_varname << ") pushed"; + } else if (sparse_id_queues_.at(send_varname)->Size() == 0) { + VLOG(3) << "wait_times -> " << wait_times; + if (wait_times >= static_cast(send_wait_times_)) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + wait_times++; + continue; + } } + std::vector res; + res.assign(sparse_ids.begin(), sparse_ids.end()); + return res; +} +void GeoCommunicator::SendSparse(const std::string &varname, int ep_idx, + const std::vector &sparse_ids) { + auto &rpc_ctx = send_varname_to_ctx_.at(varname); + auto send_varname = rpc_ctx.splited_varnames[ep_idx]; + auto trainer_id = rpc_ctx.trainer_id; + auto endpoint = rpc_ctx.epmap[ep_idx]; + auto pserver_num = rpc_ctx.epmap.size(); auto *var_latest = recv_scope_->FindVar(varname); @@ -557,33 +624,45 @@ void GeoCommunicator::SendSparse(const std::string &varname, int batches) { auto dims1 = t_latest.dims()[1]; auto cpu_ctx = paddle::platform::CPUDeviceContext(); - auto *var_delta = delta_scope_->Var(varname); + auto *var_delta = delta_scope_->Var(send_varname); auto *t_delta = var_delta->GetMutable(); - t_delta->set_height(ids.size()); - t_delta->mutable_rows()->assign(ids.begin(), ids.end()); + auto *t_value = t_delta->mutable_value(); t_value->mutable_data( - framework::make_ddim({static_cast(ids.size()), dims1}), + framework::make_ddim({static_cast(sparse_ids.size()), dims1}), cpu_ctx.GetPlace()); std::vector *>> values; auto *ins = distributed::LargeScaleKV::GetInstance(); - ins->Get(varname)->Get(ids, {"Param"}, &values); + ins->Get(varname)->Get(sparse_ids, {"Param"}, &values); auto blas = math::GetBlas(cpu_ctx); float coefficient = 1.0 / static_cast(trainers_); - for (auto j = 0; j < static_cast(ids.size()); ++j) { - blas.VSUB(dims1, t_latest.data() + ids[j] * dims1, + for (auto j = 0; j < static_cast(sparse_ids.size()); ++j) { + blas.VSUB(dims1, t_latest.data() + sparse_ids[j] * dims1, values[j][0]->data(), t_value->data() + j * dims1); blas.SCAL(dims1, coefficient, t_value->data() + j * dims1); blas.VADD(dims1, values[j][0]->data(), t_value->data() + j * dims1, values[j][0]->data()); } - auto &ctx = send_varname_to_ctx_.at(varname); - auto send = distributed::ParameterSend(); - send(ctx, *delta_scope_, true, 1); + std::vector send_rows; + send_rows.reserve(sparse_ids.size()); + for (auto idx : sparse_ids) { + send_rows.push_back(idx / pserver_num); + } + t_delta->set_height(rpc_ctx.height_sections[ep_idx]); + t_delta->set_rows(send_rows); + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &cpu_ctx_send = *pool.Get(platform::CPUPlace()); + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(trainer_id); + + auto ret = rpc_client->AsyncSendVar(endpoint, cpu_ctx_send, + *delta_scope_.get(), send_varname); + ret->Wait(); } void GeoCommunicator::SendDense(const std::string &varname) { @@ -620,37 +699,27 @@ void GeoCommunicator::SendDense(const std::string &varname) { send(ctx, *delta_scope_, true, 1); } -void GeoCommunicator::RecvByCommunicator() { - std::vector> tasks; - tasks.reserve(recv_varname_to_ctx_.size()); +void GeoCommunicator::RecvByCommunicator() { return; } - for (auto &iter : recv_varname_to_ctx_) { - auto &var_name = iter.first; - auto &recv_ctx = iter.second; +void GeoCommunicator::RecvSparse(const std::string &varname, int ep_idx) { + auto train_id = recv_varname_to_ctx_.at(varname).trainer_id; + auto endpoint = recv_varname_to_ctx_.at(varname).epmap[ep_idx]; + auto splited_var_name = + recv_varname_to_ctx_.at(varname).splited_varnames[ep_idx]; + auto pserver_num = recv_varname_to_ctx_.at(varname).epmap.size(); - auto recv_task = [this, &var_name, &recv_ctx] { - if (recv_ctx.is_sparse) { - RecvSparse(var_name); - } else { - RecvDense(var_name); - } - }; - tasks.emplace_back(send_threadpool_->enqueue(std::move(recv_task))); - } - for (auto &task : tasks) { - task.wait(); - } -} + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &cpu_ctx_recv = *pool.Get(platform::CPUPlace()); + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(train_id); -void GeoCommunicator::RecvSparse(const std::string &varname) { - VLOG(1) << "RecvSparse receive var: " << varname; + auto *var_psrever = pserver_scope_->Var(splited_var_name); + auto handle = rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv, + *pserver_scope_.get(), splited_var_name, + splited_var_name, splited_var_name); + handle->Wait(); auto *var_latest = recv_scope_->FindVar(varname); - auto *var_psrever = pserver_scope_->Var(varname); - - auto &ctx = recv_varname_to_ctx_.at(varname); - auto recv = distributed::ParameterRecv(); - recv(ctx, *pserver_scope_, true); PADDLE_ENFORCE_EQ( var_psrever->IsInitialized(), true, @@ -661,7 +730,11 @@ void GeoCommunicator::RecvSparse(const std::string &varname) { ids.assign(var_psrever->Get().rows().begin(), var_psrever->Get().rows().end()); - VLOG(1) << "RecvSparse receive var: " << varname + for (size_t j = 0; j < ids.size(); j++) { + ids[j] = ids[j] * pserver_num + ep_idx; + } + + VLOG(3) << "RecvSparse receive var: " << splited_var_name << " ids Size: " << ids.size(); auto t_psrever = var_psrever->Get().value(); diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 07fd4ed496000d949e11663b7c2b758f9f23587d..4e3dd1d07bbb383e489f42f35029c25ce46b9a34 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -284,7 +284,7 @@ class AsyncCommunicator : public Communicator { void InitParams(); - void MainThread(); + virtual void MainThread(); void Send(const std::vector &var_names, const std::vector &var_tables, @@ -408,7 +408,7 @@ class GeoCommunicator : public AsyncCommunicator { void InitImpl(const RpcCtxMap &send_varname_to_ctx, const RpcCtxMap &recv_varname_to_ctx, Scope *recv_scope) override; - + void MainThread() override; void InitEnvs() { min_send_grad_num_before_recv_ = 0; @@ -426,9 +426,12 @@ class GeoCommunicator : public AsyncCommunicator { const std::vector &var_tables, const framework::Scope &scope) override; - void SendByCommunicator(int batches) override; + void SendByCommunicator(int batches) { return; } + + std::vector MergeSparseIds(const std::string &send_varname); - void SendSparse(const std::string &varname, int batches); + void SendSparse(const std::string &varname, int ep_idx, + const std::vector &sparse_ids); void SendDense(const std::string &varname); @@ -436,7 +439,7 @@ class GeoCommunicator : public AsyncCommunicator { void RecvByCommunicator() override; - void RecvSparse(const std::string &varname); + void RecvSparse(const std::string &varname, int ep_idx); void RecvDense(const std::string &varname); @@ -459,11 +462,13 @@ class GeoCommunicator : public AsyncCommunicator { // parameter on pserver std::shared_ptr pserver_scope_; - std::unordered_map>>> - send_ids_to_queue_; - + int send_var_nums_ = 0; std::unordered_map> old_sparses_; + + std::unordered_map< + std::string, + std::shared_ptr>>>> + sparse_id_queues_; }; } // namespace distributed diff --git a/paddle/fluid/platform/device_context.cc b/paddle/fluid/platform/device_context.cc index 34305c404b4df72ed28547e46817be00d6722a42..2e1517aa79ef737d1b96d9f1ec6b1193fe02f707 100644 --- a/paddle/fluid/platform/device_context.cc +++ b/paddle/fluid/platform/device_context.cc @@ -144,7 +144,8 @@ CPUDeviceContext::CPUDeviceContext(CPUPlace place) : place_(place) { void CPUDeviceContext::InitPoolDevice() { using EigenEnv = Eigen::StlThreadEnvironment; using EigenThreadPool = Eigen::ThreadPoolTempl; - int num_threads = std::thread::hardware_concurrency(); + // int num_threads = std::thread::hardware_concurrency(); + int num_threads = 1; eigen_threadpool_.reset(new EigenThreadPool(num_threads)); eigen_pool_device_.reset( new Eigen::ThreadPoolDevice(eigen_threadpool_.get(), num_threads)); diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 3f826da3ae2beca51b639a69da4113e6d9580d6c..90bcdee50730f4317da59d778486c20a7fa56090 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -169,7 +169,7 @@ def append_send_ops_pass(program, config): trainer_id = config.get_role_id() pserver_endpoints = config.get_ps_endpoints() - def _append_send_op(union_vars, queue): + def _append_grad_send_op(union_vars, queue): if queue == STEP_COUNTER: send_input_vars = [] @@ -198,6 +198,43 @@ def append_send_ops_pass(program, config): return dummy_output + def _append_sparse_ids_send_op(): + sparse_var = [] + sparse_tables = [] + unique_sparse_var = {} + for op in program.global_block().ops: + if "is_sparse" in op.all_attrs(): + if op.type == "lookup_table": + op._set_attr('remote_prefetch', False) + for input_var_name, sparse_var_name in zip( + op.input("Ids"), op.input("W")): + if input_var_name in unique_sparse_var: + if unique_sparse_var[input_var_name] == sparse_var_name: + continue + input_var = program.global_block().var(input_var_name) + sparse_var.append(input_var) + sparse_tables.append(sparse_var_name) + unique_sparse_var[input_var_name] = sparse_var_name + + dummy_output = [] + if mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]: + dummy_output = program.global_block().create_var( + name=framework.generate_control_dev_var_name()) + + program.global_block().append_op( + type="send", + inputs={"X": sparse_var}, + outputs={"Out": dummy_output}, + attrs={ + "send_varnames": sparse_tables, + "merge_add": True, + "use_send_handler": False, + "endpoints": pserver_endpoints, + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) + + return dummy_output + def _append_barrier_op(dummys): program.global_block().append_op( type="send_barrier", @@ -214,8 +251,12 @@ def append_send_ops_pass(program, config): sends = config.get_trainer_send_context() - for merged_name, send in sends.items(): - dummys.append(_append_send_op(send.origin_varnames(), merged_name)) + if mode == DistributedMode.GEO: + dummys.append(_append_sparse_ids_send_op()) + else: + for merged_name, send in sends.items(): + dummys.append( + _append_grad_send_op(send.origin_varnames(), merged_name)) if mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]: _append_barrier_op(dummys) diff --git a/python/paddle/fluid/tests/unittests/test_communicator_async.py b/python/paddle/fluid/tests/unittests/test_communicator_async.py index a86b80b2cf98829a683045ae302f72a694809138..13b9d2e3515b1fc50f37fd67e7cb6d0878749f20 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_async.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_async.py @@ -27,6 +27,8 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory +paddle.enable_static() + class TestCommunicator(unittest.TestCase): def net(self):