未验证 提交 cc780b19 编写于 作者: 1 123malin 提交者: GitHub

test=develop, optimize geo communicator (#26857)

* test=develop, optimize geo communicator 
上级 5132f512
......@@ -13,13 +13,16 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/distributed/communicator.h"
#include <gflags/gflags.h>
#include <paddle/fluid/framework/program_desc.h>
#include <algorithm>
#include <chrono> // NOLINT
#include <map>
#include <thread> // NOLINT
#include <unordered_set>
#include "paddle/fluid/framework/eigen.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/tensor_util.h"
......@@ -428,13 +431,20 @@ void GeoCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx,
} else {
auto &send_ctx = iter.second;
send_var_nums_ += send_ctx.splited_varnames.size();
if (!send_ctx.is_sparse) {
continue;
}
send_ids_to_queue_[varname] =
std::make_shared<BlockingQueue<std::vector<int64_t>>>(
send_queue_size_);
int pserver_num = static_cast<int>(send_ctx.epmap.size());
for (int ep_idx = 0; ep_idx < pserver_num; ep_idx++) {
sparse_id_queues_.insert(
std::pair<std::string, std::shared_ptr<BlockingQueue<
std::shared_ptr<std::vector<int64_t>>>>>(
send_ctx.splited_varnames[ep_idx],
std::make_shared<
BlockingQueue<std::shared_ptr<std::vector<int64_t>>>>(
send_queue_size_)));
}
}
}
send_threadpool_.reset(new ::ThreadPool(thread_pool_size_));
......@@ -457,95 +467,152 @@ void GeoCommunicator::Send(const std::vector<std::string> &var_names,
const framework::Scope &scope) {
waiting_ = false;
PADDLE_ENFORCE_EQ(
var_tables.size(), 1,
platform::errors::InvalidArgument("var_tables.size() == 1 is permitted"));
auto table_name = var_tables[0];
auto before_send = GetCurrentUS();
std::unordered_map<std::string, std::unordered_set<int64_t>> ids_table;
for (size_t i = 0; i < var_tables.size(); i++) {
auto table_name = var_tables[i];
if (table_name == STEP_COUNTER) {
auto &queue = send_varname_to_queue_.at(table_name);
auto tmp_var = std::make_shared<Variable>();
auto *tensor = tmp_var->GetMutable<framework::LoDTensor>();
tensor->Resize(framework::make_ddim({1}));
auto *out_d = tensor->mutable_data<int64_t>(platform::CPUPlace());
out_d[0] = 1;
VLOG(3) << "send to " << table_name << " with queue size " << queue->Size();
queue->Push(tmp_var);
continue;
} else {
auto &queue = send_ids_to_queue_.at(table_name);
PADDLE_ENFORCE_EQ(var_names.size(), 1,
platform::errors::InvalidArgument(
"var_names.size() == 1 is permitted"));
size_t splited_var_nums =
send_varname_to_ctx_[table_name].splited_varnames.size();
auto *var = scope.FindVar(var_names[0]);
for (size_t j = 0; j < splited_var_nums; j++) {
if (ids_table.find(
send_varname_to_ctx_[table_name].splited_varnames[j]) ==
ids_table.end()) {
ids_table.insert(std::pair<std::string, std::unordered_set<int64_t>>(
send_varname_to_ctx_[table_name].splited_varnames[j],
std::unordered_set<int64_t>()));
}
}
PADDLE_ENFORCE_EQ(
var->IsInitialized(), true,
platform::errors::InvalidArgument("grad var should be inited"));
auto *var = scope.FindVar(var_names[i]);
auto var_tensor = var->Get<framework::LoDTensor>();
int element_number = var_tensor.numel();
const int64_t *var_mutable_data = var_tensor.data<int64_t>();
if (!var->IsType<framework::SelectedRows>()) {
PADDLE_THROW(platform::errors::InvalidArgument(
"Only LodTensor can be send in GeoCommunicator::Send"));
// insert ids which has not been record
for (int j = 0; j < element_number; j++) {
auto ep_idx = var_mutable_data[j] % splited_var_nums;
ids_table.at(send_varname_to_ctx_[table_name].splited_varnames[ep_idx])
.insert(var_mutable_data[j]);
}
std::vector<int64_t> ids;
auto &rows = var->Get<framework::SelectedRows>().rows();
ids.assign(rows.begin(), rows.end());
queue->Push(ids);
}
}
auto before_push = GetCurrentUS();
for (auto &iter : ids_table) {
auto &key = iter.first;
auto &sparse_ids_set = iter.second;
auto sparse_ids_vec = std::make_shared<std::vector<int64_t>>();
sparse_ids_vec->assign(sparse_ids_set.begin(), sparse_ids_set.end());
sparse_id_queues_.at(key)->Push(sparse_ids_vec);
VLOG(3) << "push " << sparse_ids_vec->size() << " ids to " << key
<< "'s queue";
}
auto after_send = GetCurrentUS();
VLOG(3) << "run send_op finish. using " << (before_push - before_send) << "; "
<< (after_send - before_push);
}
void GeoCommunicator::SendByCommunicator(int batches) {
void GeoCommunicator::MainThread() {
VLOG(3) << "MainThread start and wait";
while (waiting_ && running_) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
VLOG(3) << "wait for running";
}
while (running_) {
std::vector<std::future<void>> tasks;
tasks.reserve(send_varname_to_ctx_.size());
tasks.reserve(send_var_nums_);
for (auto &iter : send_varname_to_ctx_) {
auto &var_name = iter.first;
auto &send_ctx = iter.second;
auto send_task = [this, batches, &var_name, &send_ctx] {
int pserver_num = static_cast<int>(send_ctx.epmap.size());
if (send_ctx.is_sparse) {
for (int ep_idx = 0; ep_idx < pserver_num; ep_idx++) {
auto send_recv_task = [this, ep_idx, &var_name] {
auto before_send_sparse = GetCurrentUS();
if (var_name == STEP_COUNTER) {
return;
}
if (send_ctx.is_sparse) {
SendSparse(var_name, batches);
auto send_varname =
send_varname_to_ctx_.at(var_name).splited_varnames[ep_idx];
auto sparse_ids = MergeSparseIds(send_varname);
if (sparse_ids.size() == 0) {
return;
}
SendSparse(var_name, ep_idx, sparse_ids);
auto after_send_sparse = GetCurrentUS();
RecvSparse(var_name, ep_idx);
auto after_recv_sparse = GetCurrentUS();
VLOG(3)
<< "send recv "
<< send_varname_to_ctx_.at(var_name).splited_varnames[ep_idx]
<< " finish, using " << (after_send_sparse - before_send_sparse)
<< " and " << (after_recv_sparse - after_send_sparse)
<< "; total = " << (after_recv_sparse - before_send_sparse);
};
tasks.emplace_back(
send_threadpool_->enqueue(std::move(send_recv_task)));
}
} else {
VLOG(1) << "send dense " << var_name << " begin";
SendDense(var_name);
VLOG(1) << "send dense " << var_name << " done";
auto send_recv_task = [this, &var_name, &send_ctx] {
if (var_name == STEP_COUNTER) {
return;
}
SendDense(var_name);
RecvDense(var_name);
};
tasks.emplace_back(send_threadpool_->enqueue(std::move(send_task)));
tasks.emplace_back(
send_threadpool_->enqueue(std::move(send_recv_task)));
}
}
for (auto &task : tasks) {
task.wait();
}
}
}
void GeoCommunicator::SendSparse(const std::string &varname, int batches) {
std::vector<int64_t> ids;
auto &ids_queue = send_ids_to_queue_.at(varname);
for (int i = 0; i < batches; ++i) {
auto pop_ids = ids_queue->Pop();
std::copy(pop_ids.begin(), pop_ids.end(), back_inserter(ids));
std::vector<int64_t> GeoCommunicator::MergeSparseIds(
const std::string &send_varname) {
size_t merge_num = 0, wait_times = 0;
std::unordered_set<int64_t> sparse_ids;
while (merge_num < static_cast<size_t>(max_merge_var_num_)) {
VLOG(3) << "Merge Number of " << send_varname << " = " << merge_num;
if (sparse_id_queues_.at(send_varname)->Size() > 0) {
wait_times = 0;
std::shared_ptr<std::vector<int64_t>> pop_ids =
sparse_id_queues_.at(send_varname)->Pop();
for (size_t j = 0; j < pop_ids->size(); j++) {
sparse_ids.insert(pop_ids->at(j));
}
merge_num += 1;
VLOG(3) << "sparse_id_queues_(" << send_varname << ") pushed";
} else if (sparse_id_queues_.at(send_varname)->Size() == 0) {
VLOG(3) << "wait_times -> " << wait_times;
if (wait_times >= static_cast<size_t>(send_wait_times_)) {
break;
}
auto size = ids.size();
std::set<int64_t> st(ids.begin(), ids.end());
ids.assign(st.begin(), st.end());
VLOG(1) << "SendSparse receive var: " << varname << " unset: " << size
<< " set: " << ids.size();
if (ids.empty()) {
LOG(WARNING) << "WARNING: GEO has nothing to send, return directly ";
return;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
wait_times++;
continue;
}
}
std::vector<int64_t> res;
res.assign(sparse_ids.begin(), sparse_ids.end());
return res;
}
void GeoCommunicator::SendSparse(const std::string &varname, int ep_idx,
const std::vector<int64_t> &sparse_ids) {
auto &rpc_ctx = send_varname_to_ctx_.at(varname);
auto send_varname = rpc_ctx.splited_varnames[ep_idx];
auto trainer_id = rpc_ctx.trainer_id;
auto endpoint = rpc_ctx.epmap[ep_idx];
auto pserver_num = rpc_ctx.epmap.size();
auto *var_latest = recv_scope_->FindVar(varname);
......@@ -557,33 +624,45 @@ void GeoCommunicator::SendSparse(const std::string &varname, int batches) {
auto dims1 = t_latest.dims()[1];
auto cpu_ctx = paddle::platform::CPUDeviceContext();
auto *var_delta = delta_scope_->Var(varname);
auto *var_delta = delta_scope_->Var(send_varname);
auto *t_delta = var_delta->GetMutable<framework::SelectedRows>();
t_delta->set_height(ids.size());
t_delta->mutable_rows()->assign(ids.begin(), ids.end());
auto *t_value = t_delta->mutable_value();
t_value->mutable_data<float>(
framework::make_ddim({static_cast<int64_t>(ids.size()), dims1}),
framework::make_ddim({static_cast<int64_t>(sparse_ids.size()), dims1}),
cpu_ctx.GetPlace());
std::vector<std::vector<std::vector<float> *>> values;
auto *ins = distributed::LargeScaleKV::GetInstance();
ins->Get(varname)->Get(ids, {"Param"}, &values);
ins->Get(varname)->Get(sparse_ids, {"Param"}, &values);
auto blas = math::GetBlas<platform::CPUDeviceContext, float>(cpu_ctx);
float coefficient = 1.0 / static_cast<float>(trainers_);
for (auto j = 0; j < static_cast<int>(ids.size()); ++j) {
blas.VSUB(dims1, t_latest.data<float>() + ids[j] * dims1,
for (auto j = 0; j < static_cast<int>(sparse_ids.size()); ++j) {
blas.VSUB(dims1, t_latest.data<float>() + sparse_ids[j] * dims1,
values[j][0]->data(), t_value->data<float>() + j * dims1);
blas.SCAL(dims1, coefficient, t_value->data<float>() + j * dims1);
blas.VADD(dims1, values[j][0]->data(), t_value->data<float>() + j * dims1,
values[j][0]->data());
}
auto &ctx = send_varname_to_ctx_.at(varname);
auto send = distributed::ParameterSend<float>();
send(ctx, *delta_scope_, true, 1);
std::vector<int64_t> send_rows;
send_rows.reserve(sparse_ids.size());
for (auto idx : sparse_ids) {
send_rows.push_back(idx / pserver_num);
}
t_delta->set_height(rpc_ctx.height_sections[ep_idx]);
t_delta->set_rows(send_rows);
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx_send = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(trainer_id);
auto ret = rpc_client->AsyncSendVar(endpoint, cpu_ctx_send,
*delta_scope_.get(), send_varname);
ret->Wait();
}
void GeoCommunicator::SendDense(const std::string &varname) {
......@@ -620,37 +699,27 @@ void GeoCommunicator::SendDense(const std::string &varname) {
send(ctx, *delta_scope_, true, 1);
}
void GeoCommunicator::RecvByCommunicator() {
std::vector<std::future<void>> tasks;
tasks.reserve(recv_varname_to_ctx_.size());
void GeoCommunicator::RecvByCommunicator() { return; }
for (auto &iter : recv_varname_to_ctx_) {
auto &var_name = iter.first;
auto &recv_ctx = iter.second;
void GeoCommunicator::RecvSparse(const std::string &varname, int ep_idx) {
auto train_id = recv_varname_to_ctx_.at(varname).trainer_id;
auto endpoint = recv_varname_to_ctx_.at(varname).epmap[ep_idx];
auto splited_var_name =
recv_varname_to_ctx_.at(varname).splited_varnames[ep_idx];
auto pserver_num = recv_varname_to_ctx_.at(varname).epmap.size();
auto recv_task = [this, &var_name, &recv_ctx] {
if (recv_ctx.is_sparse) {
RecvSparse(var_name);
} else {
RecvDense(var_name);
}
};
tasks.emplace_back(send_threadpool_->enqueue(std::move(recv_task)));
}
for (auto &task : tasks) {
task.wait();
}
}
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &cpu_ctx_recv = *pool.Get(platform::CPUPlace());
distributed::RPCClient *rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>(train_id);
void GeoCommunicator::RecvSparse(const std::string &varname) {
VLOG(1) << "RecvSparse receive var: " << varname;
auto *var_psrever = pserver_scope_->Var(splited_var_name);
auto handle = rpc_client->AsyncGetVar(endpoint, cpu_ctx_recv,
*pserver_scope_.get(), splited_var_name,
splited_var_name, splited_var_name);
handle->Wait();
auto *var_latest = recv_scope_->FindVar(varname);
auto *var_psrever = pserver_scope_->Var(varname);
auto &ctx = recv_varname_to_ctx_.at(varname);
auto recv = distributed::ParameterRecv<float>();
recv(ctx, *pserver_scope_, true);
PADDLE_ENFORCE_EQ(
var_psrever->IsInitialized(), true,
......@@ -661,7 +730,11 @@ void GeoCommunicator::RecvSparse(const std::string &varname) {
ids.assign(var_psrever->Get<framework::SelectedRows>().rows().begin(),
var_psrever->Get<framework::SelectedRows>().rows().end());
VLOG(1) << "RecvSparse receive var: " << varname
for (size_t j = 0; j < ids.size(); j++) {
ids[j] = ids[j] * pserver_num + ep_idx;
}
VLOG(3) << "RecvSparse receive var: " << splited_var_name
<< " ids Size: " << ids.size();
auto t_psrever = var_psrever->Get<framework::SelectedRows>().value();
......
......@@ -284,7 +284,7 @@ class AsyncCommunicator : public Communicator {
void InitParams();
void MainThread();
virtual void MainThread();
void Send(const std::vector<std::string> &var_names,
const std::vector<std::string> &var_tables,
......@@ -408,7 +408,7 @@ class GeoCommunicator : public AsyncCommunicator {
void InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RpcCtxMap &recv_varname_to_ctx,
Scope *recv_scope) override;
void MainThread() override;
void InitEnvs() {
min_send_grad_num_before_recv_ = 0;
......@@ -426,9 +426,12 @@ class GeoCommunicator : public AsyncCommunicator {
const std::vector<std::string> &var_tables,
const framework::Scope &scope) override;
void SendByCommunicator(int batches) override;
void SendByCommunicator(int batches) { return; }
std::vector<int64_t> MergeSparseIds(const std::string &send_varname);
void SendSparse(const std::string &varname, int batches);
void SendSparse(const std::string &varname, int ep_idx,
const std::vector<int64_t> &sparse_ids);
void SendDense(const std::string &varname);
......@@ -436,7 +439,7 @@ class GeoCommunicator : public AsyncCommunicator {
void RecvByCommunicator() override;
void RecvSparse(const std::string &varname);
void RecvSparse(const std::string &varname, int ep_idx);
void RecvDense(const std::string &varname);
......@@ -459,11 +462,13 @@ class GeoCommunicator : public AsyncCommunicator {
// parameter on pserver
std::shared_ptr<Scope> pserver_scope_;
std::unordered_map<std::string,
std::shared_ptr<BlockingQueue<std::vector<int64_t>>>>
send_ids_to_queue_;
int send_var_nums_ = 0;
std::unordered_map<std::string, std::shared_ptr<SparseValue>> old_sparses_;
std::unordered_map<
std::string,
std::shared_ptr<BlockingQueue<std::shared_ptr<std::vector<int64_t>>>>>
sparse_id_queues_;
};
} // namespace distributed
......
......@@ -144,7 +144,8 @@ CPUDeviceContext::CPUDeviceContext(CPUPlace place) : place_(place) {
void CPUDeviceContext::InitPoolDevice() {
using EigenEnv = Eigen::StlThreadEnvironment;
using EigenThreadPool = Eigen::ThreadPoolTempl<EigenEnv>;
int num_threads = std::thread::hardware_concurrency();
// int num_threads = std::thread::hardware_concurrency();
int num_threads = 1;
eigen_threadpool_.reset(new EigenThreadPool(num_threads));
eigen_pool_device_.reset(
new Eigen::ThreadPoolDevice(eigen_threadpool_.get(), num_threads));
......
......@@ -169,7 +169,7 @@ def append_send_ops_pass(program, config):
trainer_id = config.get_role_id()
pserver_endpoints = config.get_ps_endpoints()
def _append_send_op(union_vars, queue):
def _append_grad_send_op(union_vars, queue):
if queue == STEP_COUNTER:
send_input_vars = []
......@@ -198,6 +198,43 @@ def append_send_ops_pass(program, config):
return dummy_output
def _append_sparse_ids_send_op():
sparse_var = []
sparse_tables = []
unique_sparse_var = {}
for op in program.global_block().ops:
if "is_sparse" in op.all_attrs():
if op.type == "lookup_table":
op._set_attr('remote_prefetch', False)
for input_var_name, sparse_var_name in zip(
op.input("Ids"), op.input("W")):
if input_var_name in unique_sparse_var:
if unique_sparse_var[input_var_name] == sparse_var_name:
continue
input_var = program.global_block().var(input_var_name)
sparse_var.append(input_var)
sparse_tables.append(sparse_var_name)
unique_sparse_var[input_var_name] = sparse_var_name
dummy_output = []
if mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]:
dummy_output = program.global_block().create_var(
name=framework.generate_control_dev_var_name())
program.global_block().append_op(
type="send",
inputs={"X": sparse_var},
outputs={"Out": dummy_output},
attrs={
"send_varnames": sparse_tables,
"merge_add": True,
"use_send_handler": False,
"endpoints": pserver_endpoints,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
return dummy_output
def _append_barrier_op(dummys):
program.global_block().append_op(
type="send_barrier",
......@@ -214,8 +251,12 @@ def append_send_ops_pass(program, config):
sends = config.get_trainer_send_context()
if mode == DistributedMode.GEO:
dummys.append(_append_sparse_ids_send_op())
else:
for merged_name, send in sends.items():
dummys.append(_append_send_op(send.origin_varnames(), merged_name))
dummys.append(
_append_grad_send_op(send.origin_varnames(), merged_name))
if mode in [DistributedMode.SYNC, DistributedMode.HALF_ASYNC]:
_append_barrier_op(dummys)
......
......@@ -27,6 +27,8 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
paddle.enable_static()
class TestCommunicator(unittest.TestCase):
def net(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册