未验证 提交 3f5c405f 编写于 作者: Z ziyoujiyi 提交者: GitHub

fl-ps: support split sparse params in local & remote (#44864)

* back fl

* delete ssl cert

* .

* make warning

* .

* unittest paral degree

* solve unittest

* heter & multi cloud commm ready

* .

* .

* fl-ps v1.0

* .

* support N + N mode

* .

* .

* .

* .

* delete print

* .

* .

* .

* .

* fix bug

* .

* .

* fl-ps with coordinator ready

* merge dev

* update message parse only

* update fl client scheduler

* fix bug

* update multithreads sync

* fix ci errors

* update role_maker.py

* update role_maker.py

* fix ci error: windows py import error

* fix ci error: windows py import error

* fix windows ci pylib import error

* add dump fields & params

* try to fix windows import fleet error

* fix ps FLAGS error

* fix logging risk

* fix logging possible risk

* write trainer_desc file

* support split sparse params in local & remote

* fix import paddle.fluid.core.PSGPU

* fix import paddle.fluid.core.PSGPU

* add remote_sparse & local_sparse config

* fix unittest

* fix test_dist_fleet_geo table error

* fix PADDLE_ENFORCE error

* fix other's pr conflict
上级 1e965756
......@@ -167,17 +167,17 @@ void Communicator::RpcSendDenseParam(const std::vector<std::string> &varnames,
framework::TensorCopy(*tensor, platform::CPUPlace(), temp_tensor);
paddle::distributed::Region reg(temp_data, tensor->numel());
regions.emplace_back(std::move(reg));
VLOG(1) << "AsyncCommunicator::RpcSendDenseParam Var " << t
<< " table_id " << table_id << " Temp_data[0] " << temp_data[0]
<< " Temp_data[-1] " << temp_data[tensor->numel() - 1];
VLOG(1) << "rpc_send_dense_param Var " << t << " table_id " << table_id
<< " Temp_data[0] " << temp_data[0] << " Temp_data[-1] "
<< temp_data[tensor->numel() - 1];
#endif
} else {
float *w = tensor->mutable_data<float>(place);
paddle::distributed::Region reg(w, tensor->numel());
regions.emplace_back(std::move(reg));
VLOG(1) << "AsyncCommunicator::RpcSendDenseParam Var " << t
<< " talbe_id " << table_id << " Temp_data[0] " << w[0]
<< " Temp_data[-1] " << w[tensor->numel() - 1];
regions.emplace_back(reg);
VLOG(1) << "rpc_send_dense_param Var " << t << " talbe_id " << table_id
<< " Temp_data[0] " << w[0] << " Temp_data[-1] "
<< w[tensor->numel() - 1];
}
}
auto status =
......@@ -1070,27 +1070,31 @@ void GeoCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RecvCtxMap &recv_varname_to_ctx,
Scope *recv_scope) {
send_varname_to_ctx_ = std::move(send_varname_to_ctx);
recv_varname_to_ctx_ = std::move(recv_varname_to_ctx);
recv_varname_to_ctx_ = std::move(
recv_varname_to_ctx); // dense_map - key: table_id, value: params
recv_scope_ = std::move(recv_scope);
PADDLE_ENFORCE_GT(
send_varname_to_ctx.size(),
0,
platform::errors::InvalidArgument("send var contexts can not be zero"));
for (auto &iter : send_varname_to_ctx_) {
auto &ctx = iter.second;
for (auto it = send_varname_to_ctx_.begin();
it != send_varname_to_ctx_.end();) {
auto &ctx = it->second;
if (!ctx.is_sparse) {
parallel_task_nums_ += 1;
it++;
continue;
}
auto &varnames = ctx.origin_varnames;
PADDLE_ENFORCE_EQ(
varnames.size(),
1,
platform::errors::InvalidArgument(
"sparse variables can only be merged by one variables"));
for (auto &splited_var : ctx.splited_varnames) {
if (varnames.empty()) {
VLOG(0) << "ERROR! sparse variables num can not be zero";
}
auto &varname = varnames[0]; // embedding_0.w_0@GRAD
auto &ids = ctx.remote_sparse_ids;
if (!ids.empty()) {
it = send_varname_to_ctx_.erase(it);
continue;
} else {
it++;
}
for (auto &splited_var : ctx.splited_varnames) { // embedding_0.w_0.block0
parallel_task_nums_ += 1;
sparse_id_queues_.insert(
std::pair<std::string,
......@@ -1101,12 +1105,11 @@ void GeoCommunicator::InitImpl(const RpcCtxMap &send_varname_to_ctx,
std::shared_ptr<std::vector<int64_t>>>(send_queue_size_)));
}
}
send_threadpool_.reset(new ::ThreadPool(thread_pool_size_));
delta_scope_.reset(new Scope());
old_scope_.reset(new Scope());
pserver_scope_.reset(new Scope());
send_threadpool_ = std::make_unique<ThreadPool>(thread_pool_size_);
delta_scope_ = std::make_shared<Scope>();
old_scope_ = std::make_shared<Scope>();
pserver_scope_ = std::make_shared<Scope>();
return;
}
void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) {
......@@ -1116,10 +1119,12 @@ void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) {
for (auto &iter : recv_varname_to_ctx_) {
auto &table_id = iter.first;
auto &varnames = iter.second;
auto recv_task = [this, &table_id, &varnames] {
InitDense(varnames, table_id);
};
if (send_threadpool_ == nullptr) {
VLOG(0) << "ERROR! send_threadpool_ is nullptr";
}
tasks.emplace_back(send_threadpool_->enqueue(std::move(recv_task)));
}
......@@ -1129,10 +1134,13 @@ void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) {
for (auto &iter : send_varname_to_ctx_) {
auto &ctx = iter.second;
if (!ctx.is_sparse) continue;
if (!ctx.is_sparse) {
continue;
}
auto &varname = ctx.origin_varnames[0];
auto &table_id = ctx.table_id;
auto param = varname.substr(0, varname.size() - 5);
VLOG(0) << "InitSparse: " << param << ", " << table_id;
InitSparse(param, table_id);
}
return;
......@@ -1140,6 +1148,7 @@ void GeoCommunicator::InitParams(const RecvCtxMap &recv_varname_to_ctx) {
void GeoCommunicator::InitDense(std::vector<std::string> &varnames,
int table_id) {
VLOG(1) << "init dense table " << table_id << " begin";
if (trainer_id_ == 0) {
RpcSendDenseParam(varnames, table_id, *recv_scope_);
BarrierWithTable(1);
......@@ -1223,7 +1232,8 @@ void GeoCommunicator::RecvDense(const CommContext &send_ctx) {
// 1. recv from pserver
RpcRecvDense(varnames, table_id, pserver_scope_.get());
// 2.1 pserver - old => delta; 2.2 latest + delta => latest 2.3 old => pserver
// 2.1 pserver - old => delta; 2.2 latest + delta => latest 2.3 old =>
// pserver
phi::CPUContext cpu_ctx;
for (auto &varname : varnames) {
auto *var_latest = recv_scope_->FindVar(varname);
......@@ -1505,8 +1515,8 @@ void FLCommunicator::InitBrpcClient(
if (_worker_ptr.get() == nullptr) {
VLOG(0) << "fl-ps > FLCommunicator::InitBrpcClient get _worker_ptr";
_worker_ptr =
fleet->worker_ptr_; // FleetWrapper::InitWorker must be excuted before,
// but no need for Coordinator
fleet->worker_ptr_; // FleetWrapper::InitWorker must be excuted
// before, but no need for Coordinator
}
if (coordinator_client_ptr_ == nullptr) {
coordinator_client_ptr_.reset(new CoordinatorClient);
......
......@@ -370,7 +370,7 @@ class Communicator {
return communicator_.get();
}
// Init is called by InitInstance.
// called by InitInstance.
template <typename T>
static void InitWithRpcCtx(const RpcCtxMap &send_ctx,
const RecvCtxMap &recv_ctx,
......@@ -378,6 +378,7 @@ class Communicator {
const std::vector<std::string> &host_sign_list,
Scope *recv_scope,
const std::map<std::string, std::string> &envs) {
VLOG(0) << "Communicator type is: " << typeid(T).name();
if (communicator_.get() == nullptr) {
communicator_.reset(new T(std::ref(envs)));
communicator_->InitEnvs();
......@@ -601,10 +602,6 @@ class GeoCommunicator : public AsyncCommunicator {
explicit GeoCommunicator(const std::map<std::string, std::string> &envs)
: AsyncCommunicator(envs) {}
void InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RecvCtxMap &recv_varname_to_ctx,
Scope *recv_scope) override;
void InitParams(const RecvCtxMap &recv_varname_to_ctx) override;
void InitDense(std::vector<std::string> &varnames, int table_id); // NOLINT
void InitSparse(const std::string &var_name, int table_id);
......@@ -621,7 +618,7 @@ class GeoCommunicator : public AsyncCommunicator {
void MainThread() override;
void InitEnvs() {
virtual void InitEnvs() {
independent_recv_ = false;
min_send_grad_num_before_recv_ = 0;
send_wait_times_ = std::stoi(envs.at("communicator_send_wait_times"));
......@@ -632,6 +629,10 @@ class GeoCommunicator : public AsyncCommunicator {
VLOG(1) << "GeoCommunicator Initialized";
}
void InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RecvCtxMap &recv_varname_to_ctx,
Scope *recv_scope) override;
void Send(const std::vector<std::string> &var_names,
const framework::Scope &scope) override;
......@@ -651,7 +652,7 @@ class GeoCommunicator : public AsyncCommunicator {
return param_name;
}
private:
public:
// parameter for delta calc and send
std::shared_ptr<Scope> delta_scope_;
// parameter for storage the pserver param after last recv
......@@ -684,7 +685,7 @@ class FLCommunicator : public GeoCommunicator {
void InitImpl(const RpcCtxMap &send_varname_to_ctx,
const RecvCtxMap &recv_varname_to_ctx,
Scope *recv_scope) override {}
Scope *recv_scope) {}
void StartCoordinatorClient(
const std::vector<std::string> &trainer_endpoints);
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#pragma once
#include <algorithm>
#include <iostream>
#include <sstream>
#include <string>
......@@ -30,27 +31,29 @@ struct CommContext {
const std::vector<std::string> &emap,
const std::vector<int64_t> &sections,
const std::vector<std::string> &origin_names,
int id,
bool merge_add_ = true,
bool is_sparse_ = true,
bool is_distributed_ = false,
int table_id_ = -1,
bool is_tensor_table_ = false,
bool is_datanorm_table_ = false,
int64_t program_id_ = -1)
int trainer_id,
bool merge_add = true,
bool is_sparse = true,
bool is_distributed = false,
int table_id = -1,
bool is_tensor_table = false,
bool is_datanorm_table = false,
int64_t program_id = -1,
const std::vector<int32_t> &remote_sparse_ids = {})
: var_name(name),
splited_varnames(names),
epmap(emap),
height_sections(sections),
origin_varnames(origin_names),
trainer_id(id),
merge_add(merge_add_),
is_sparse(is_sparse_),
is_distributed(is_distributed_),
table_id(table_id_),
program_id(program_id_),
is_tensor_table(is_tensor_table_),
is_datanorm_table(is_datanorm_table_) {}
trainer_id(trainer_id),
merge_add(merge_add),
is_sparse(is_sparse),
is_distributed(is_distributed),
table_id(table_id),
program_id(program_id),
is_tensor_table(is_tensor_table),
is_datanorm_table(is_datanorm_table),
remote_sparse_ids(remote_sparse_ids) {}
CommContext(const CommContext &ctx) {
var_name = ctx.var_name;
......@@ -66,6 +69,7 @@ struct CommContext {
program_id = ctx.program_id;
is_tensor_table = ctx.is_tensor_table;
is_datanorm_table = ctx.is_datanorm_table;
remote_sparse_ids = ctx.remote_sparse_ids;
}
std::string print() const {
......@@ -74,6 +78,11 @@ struct CommContext {
ss << "varname: " << var_name << " trainer_id: " << trainer_id << " ";
ss << " table_id: " << table_id;
std::for_each(
remote_sparse_ids.begin(), remote_sparse_ids.end(), [&](const int &i) {
ss << "remote_sparse_id: " << i << " ";
});
for (size_t i = 0; i < splited_varnames.size(); i++) {
ss << "slice varname: " << splited_varnames[i] << " ep: " << epmap[i]
<< " section: " << height_sections[i] << " ";
......@@ -108,6 +117,7 @@ struct CommContext {
int64_t program_id;
bool is_tensor_table;
bool is_datanorm_table;
std::vector<int32_t> remote_sparse_ids;
};
} // namespace distributed
......
......@@ -168,6 +168,8 @@ message TrainerDescConfig {
repeated string stat_var_names = 4;
optional string trainer = 5;
optional string device_worker = 6;
repeated string local_sparse = 7;
repeated string remote_sparse = 8;
}
message PipelineConfig {
......
......@@ -107,8 +107,11 @@ void BindCommunicatorContext(py::module* m) {
int,
bool,
bool,
int64_t>())
int64_t,
const std::vector<int32_t>&>())
.def("var_name", [](const CommContext& self) { return self.var_name; })
.def("remote_sparse_ids",
[](const CommContext& self) { return self.remote_sparse_ids; })
.def("trainer_id",
[](const CommContext& self) { return self.trainer_id; })
.def("table_id", [](const CommContext& self) { return self.table_id; })
......
......@@ -77,9 +77,15 @@ class ParameterServerOptimizer(MetaOptimizerBase):
"use_ps_gpu"]
attrs['lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[
"lr_decay_steps"]
# FL
attrs['local_sparse'] = attrs[
"user_defined_strategy"].trainer_desc_configs["local_sparse"]
attrs['remote_sparse'] = attrs[
"user_defined_strategy"].trainer_desc_configs["remote_sparse"]
attrs['is_fl_ps_mode'] = self.user_defined_strategy.is_fl_ps_mode
attrs[
'with_coordinator'] = self.user_defined_strategy.is_with_coordinator
attrs['k_steps'] = self.user_defined_strategy.a_sync_configs["k_steps"]
attrs['launch_barrier'] = self.user_defined_strategy.a_sync_configs[
"launch_barrier"]
......
......@@ -81,19 +81,22 @@ class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
attrs = pass_ctx._attrs
ps_mode = attrs['ps_mode']
if ps_mode == DistributedMode.GEO:
send_ctx = get_geo_trainer_send_context(attrs) # geo 模式
elif attrs['is_heter_ps_mode'] == True:
print("is_heter_ps_mode in append_send_ops_pass!!")
send_ctx = get_the_one_send_context(attrs, split_dense_table=True)
else:
send_ctx = get_the_one_send_context(attrs) # async、sync 等各种模式
#if ps_mode == DistributedMode.GEO:
# send_ctx = get_geo_trainer_send_context(attrs) # geo 模式, 没必要
send_ctx = get_the_one_send_context(
attrs,
split_dense_table=attrs['is_heter_ps_mode']) # async、sync 等各种模式
dummys = []
for merged_name, send in send_ctx.items():
for merged_name, send in send_ctx.items(): # embedding_0.w_0@GRAD
if send.is_sparse() and ps_mode != DistributedMode.GEO:
continue
if (not send.is_sparse()) and ps_mode == DistributedMode.GEO:
continue
if send.program_id() != id(attrs['loss'].block.program):
continue
if len(send.remote_sparse_ids()) > 0:
continue
is_sparse = 1 if send.is_sparse() else 0
is_sparse = 2 if send.is_distributed() else is_sparse
dummys.append(
......@@ -470,6 +473,8 @@ class DistributedOpsPass(PassBase):
if attrs['is_heter_ps_mode'] and not attrs['is_fl_ps_mode']:
# TODO: trick for matchnet, need to modify for heter_ps
param_name += op.input("Ids")[0][0]
if param_name in attrs['local_sparse']: # for recall/ncf model
continue
ops = pull_sparse_ops.get(param_name, [])
ops.append(op)
pull_sparse_ops[param_name] = ops
......@@ -514,24 +519,37 @@ class DeleteOptimizesPass(PassBase):
def _check_conflict(self, other_pass):
return True
def _delete_optimizer_op_and_vars(self, _program, optimize_ops):
optimize_vars = []
optimize_op_role_vars = []
def _delete_optimizer_op_and_vars(self, _program, remote_optimize_ops,
local_optimize_ops):
local_optimize_vars = []
remote_optimize_vars = []
remote_optimize_op_role_vars = []
optimize_need_delete_vars = []
for op in optimize_ops:
optimize_vars.extend(op.input_arg_names)
optimize_op_role_vars.extend(op.attr("op_role_var"))
optimize_vars = list(set(optimize_vars))
optimize_op_role_vars = list(set(optimize_op_role_vars))
for var in optimize_vars:
if var not in optimize_op_role_vars:
for op in local_optimize_ops:
local_optimize_vars.extend(op.input_arg_names)
for op in remote_optimize_ops:
remote_optimize_vars.extend(op.input_arg_names)
remote_optimize_op_role_vars.extend(op.attr("op_role_var"))
remote_optimize_vars = list(
set(remote_optimize_vars
)) # param + grad + optimizer_state + learning_rate
remote_optimize_op_role_vars = list(
set(remote_optimize_op_role_vars)) # param + grad
print(
"remote_optimize_vars: {}, remote_optimize_op_role_vars: {}, local_optimize_vars: {}"
.format(remote_optimize_vars, remote_optimize_op_role_vars,
local_optimize_vars))
for var in remote_optimize_vars:
if var in local_optimize_vars:
continue
if var not in remote_optimize_op_role_vars:
optimize_need_delete_vars.append(var)
need_delete_optimize_vars = list(set(optimize_need_delete_vars))
delete_ops(_program.global_block(), optimize_ops)
delete_ops(_program.global_block(), remote_optimize_ops)
for var in need_delete_optimize_vars:
if _program.global_block().has_var(var):
_program.global_block()._remove_var(var)
......@@ -549,10 +567,15 @@ class DeleteOptimizesPass(PassBase):
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
attrs = pass_ctx._attrs
optimizer_ops = get_optimize_ops(main_program)
all_optimize_ops = get_optimize_ops(main_program)
remote_optimize_ops = get_optimize_ops(main_program,
attrs['remote_sparse'])
lr_ops = get_lr_ops(main_program)
optimizer_ops.extend(lr_ops)
self._delete_optimizer_op_and_vars(main_program, optimizer_ops)
remote_optimize_ops.extend(lr_ops)
local_optimize_ops = list(
set(all_optimize_ops) - set(remote_optimize_ops))
self._delete_optimizer_op_and_vars(main_program, remote_optimize_ops,
local_optimize_ops)
if hasattr(attrs['origin_main_program'], 'lr_sheduler'):
self._add_lr_var(main_program, attrs)
......@@ -572,18 +595,29 @@ class DeleteExtraOptimizerPass(PassBase):
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
attrs = pass_ctx._attrs
optimize_vars = []
optimize_op_role_vars = []
remote_optimize_vars = []
remote_optimize_op_role_vars = []
optimize_need_delete_vars = []
for op in get_optimize_ops(main_program):
optimize_vars.extend(op.input_arg_names)
optimize_op_role_vars.extend(op.attr("op_role_var"))
optimize_vars = list(set(optimize_vars))
optimize_op_role_vars = list(set(optimize_op_role_vars))
for var in optimize_vars:
if var not in optimize_op_role_vars:
all_optimize_ops = get_optimize_ops(main_program)
remote_optimize_ops = get_optimize_ops(main_program,
attrs['remote_sparse'])
local_optimize_ops = list(
set(all_optimize_ops) - set(remote_optimize_ops))
local_optimize_vars = []
for op in local_optimize_ops:
local_optimize_vars.extend(op.input_arg_names)
for op in remote_optimize_ops:
remote_optimize_vars.extend(op.input_arg_names)
remote_optimize_op_role_vars.extend(op.attr("op_role_var"))
remote_optimize_vars = list(set(remote_optimize_vars))
remote_optimize_op_role_vars = list(set(remote_optimize_op_role_vars))
for var in remote_optimize_vars:
if var in local_optimize_vars:
continue
if var not in remote_optimize_op_role_vars:
optimize_need_delete_vars.append(var)
need_delete_optimize_vars = list(set(optimize_need_delete_vars))
......@@ -620,12 +654,16 @@ class FakeInitOpsPass(PassBase):
False)
return list(set(dist_varnames + sparse_varnames))
def _fake_init_sparsetable(self, program, sparse_table_names):
def _fake_init_sparsetable(self, startup_program, sparse_table_names,
attrs):
# delete table init op
for table_name in sparse_table_names:
table_var = program.global_block().vars[table_name]
table_var = startup_program.global_block().vars[table_name]
if str(table_var).split(
":")[0].strip().split()[-1] in attrs['local_sparse']:
continue
table_param_init_op = []
for op in program.global_block().ops:
for op in startup_program.global_block().ops:
if table_name in op.output_arg_names:
table_param_init_op.append(op)
init_op_num = len(table_param_init_op)
......@@ -633,17 +671,17 @@ class FakeInitOpsPass(PassBase):
raise ValueError("table init op num should be 1, now is " +
str(init_op_num))
table_init_op = table_param_init_op[0]
program.global_block().append_op(
startup_program.global_block().append_op(
type="fake_init",
inputs={},
outputs={"Out": table_var},
attrs={"shape": table_init_op.attr('shape')})
delete_ops(program.global_block(), table_param_init_op)
delete_ops(startup_program.global_block(), table_param_init_op)
def _apply_single_impl(self, main_program, startup_program, pass_ctx):
attrs = pass_ctx._attrs
sparse_tables = self._get_sparse_table_names(attrs)
self._fake_init_sparsetable(startup_program, sparse_tables)
self._fake_init_sparsetable(startup_program, sparse_tables, attrs)
@register_pass("ps_gpu_pass")
......
......@@ -637,7 +637,7 @@ class SparseTable(Table):
check_embedding_dim(table_proto.accessor, self.common.table_name,
ctx.program_id(), self.context)
print(">>> set sparse table!")
self.common.parse_by_optimizer(ctx, self.context)
self.common.parse_entry(self.common.table_name, ctx.program_id(),
self.context)
......@@ -769,12 +769,9 @@ class PsDescBuilder(object):
self.is_heter_ps_mode = context['is_heter_ps_mode']
self.use_ps_gpu = context['use_ps_gpu']
self.barrier_table_id = None
print("is_heter_ps_mode in the_one_ps.py? {}".format(
self.is_heter_ps_mode))
self.send_ctx = get_the_one_send_context(
self.context,
use_origin_program=True,
split_dense_table=self.is_heter_ps_mode)
self.context, split_dense_table=self.is_heter_ps_mode)
self.tensor_table_dict = {} # TODO
self._server_sub_program = []
......@@ -801,10 +798,14 @@ class PsDescBuilder(object):
def _get_tables(self):
tables = []
for idx, (name, ctx) in enumerate(self.send_ctx.items()):
print("idx, name, ctx:", idx, name, ctx)
if ctx.is_sparse():
if self.ps_mode == DistributedMode.GEO:
tables.append(globals()['GeoSparseTable'](self.context,
ctx))
if (self.context['local_sparse']
and name[:-5] in self.context['local_sparse']) or (
not self.context['local_sparse']):
tables.append(globals()['GeoSparseTable'](self.context,
ctx))
else:
tables.append(globals()['SparseTable'](self.context, ctx))
else:
......@@ -812,7 +813,6 @@ class PsDescBuilder(object):
self.tensor_tables = self._get_tensor_tables()
tables.extend(self.tensor_tables)
tables.append(globals()['BarrierTable'](self.context, len(tables)))
print("test_fl_ps: tables len: {}".format(len(tables)))
return tables
def _get_service(self):
......@@ -894,6 +894,14 @@ class TheOnePSRuntime(RuntimeBase):
'ps_mode'] == DistributedMode.SYNC else False
self.context['grad_name_to_param_name'] = {}
self.context['tensor_table'] = {}
# FL
self.context['local_sparse'] = context[
"user_defined_strategy"].trainer_desc_configs["local_sparse"]
self.context['remote_sparse'] = context[
"user_defined_strategy"].trainer_desc_configs["remote_sparse"]
print("fl-ps > local_sparse: {}, remote_sparse: {}".format(
self.context['local_sparse'], self.context['remote_sparse']))
build_var_distributed(self.context)
self.trainer_endpoints = get_trainer_endpoints(self.role_maker)
......@@ -998,7 +1006,6 @@ class TheOnePSRuntime(RuntimeBase):
send_ctx = get_the_one_send_context(
self.context,
split_dense_table=self.is_heter_ps_mode,
use_origin_program=self.is_heter_ps_mode,
ep_list=self.endpoints)
self._send_ctx = send_ctx
trainer_config = self.context['trainer']
......@@ -1065,15 +1072,6 @@ class TheOnePSRuntime(RuntimeBase):
is_test = bool(int(os.getenv("TEST_MODE", "0")))
# for GEO & heter_ps
init_params = dense_map
# if not is_test:
# self._communicator.init_params(init_params)
# fleet.util.barrier()
# self._communicator.pull_dense(init_params)
# fleet.util.barrier()
if scopes is None:
if len(self.origin_main_programs) > 1:
raise ValueError(
......@@ -1087,7 +1085,7 @@ class TheOnePSRuntime(RuntimeBase):
if not is_test:
if self.context[
'ps_mode'] == DistributedMode.GEO or self.is_heter_ps_mode == True:
self._communicator.init_params(init_params)
self._communicator.init_params(dense_map)
else:
if not self.context['use_ps_gpu']:
if self.role_id == 0:
......@@ -1235,7 +1233,6 @@ class TheOnePSRuntime(RuntimeBase):
send_ctx = get_the_one_send_context(
self.context,
split_dense_table=self.is_heter_ps_mode,
use_origin_program=self.is_heter_ps_mode,
ep_list=self.endpoints)
if program is None or len(self.origin_main_programs) == 1:
program = self.origin_main_programs[0]
......@@ -1356,8 +1353,7 @@ class TheOnePSRuntime(RuntimeBase):
sparses = get_the_one_recv_context(
self.context,
is_dense=False,
split_dense_table=self.is_heter_ps_mode,
use_origin_program=True)
split_dense_table=self.is_heter_ps_mode)
sparse_names = self._save_sparse_params(executor, dirname, sparses,
main_program, mode)
......@@ -1366,7 +1362,6 @@ class TheOnePSRuntime(RuntimeBase):
send_ctx = get_the_one_send_context(
self.context,
split_dense_table=self.is_heter_ps_mode,
use_origin_program=self.is_heter_ps_mode,
ep_list=self.endpoints)
self._pull_dense(program, scope, send_ctx, dense_map)
......@@ -1444,8 +1439,7 @@ class TheOnePSRuntime(RuntimeBase):
sparses = get_the_one_recv_context(
self.context,
is_dense=False,
split_dense_table=self.is_heter_ps_mode,
use_origin_program=True)
split_dense_table=self.is_heter_ps_mode)
sparse_varnames = self._load_sparse_params(dirname, sparses,
main_program, mode)
......@@ -1455,7 +1449,6 @@ class TheOnePSRuntime(RuntimeBase):
send_ctx = get_the_one_send_context(
self.context,
split_dense_table=self.is_heter_ps_mode,
use_origin_program=self.is_heter_ps_mode,
ep_list=self.endpoints)
recv_dense_varnames = []
......@@ -1527,8 +1520,7 @@ class TheOnePSRuntime(RuntimeBase):
self.context,
is_dense=False,
split_dense_table=self.role_maker.
_is_heter_parameter_server_mode,
use_origin_program=True)
_is_heter_parameter_server_mode)
for id, names in sparses.items():
self._worker.shrink_sparse_table(id, threshold)
......
......@@ -19,7 +19,7 @@ from .public import *
__all__ = [
'PsProgramBuilder', 'GeoPsProgramBuilder', 'CpuSyncPsProgramBuilder',
'CpuAsyncPsProgramBuilder', 'GpuPsProgramBuilder',
'HeterAsyncPsProgramBuilder', 'FlPsProgramBuilder'
'HeterAsyncPsProgramBuilder', 'FlPsProgramBuilder', 'NuPsProgramBuilder'
]
......@@ -31,7 +31,10 @@ class PsProgramBuilderFactory(object):
def _create_ps_program_builder(self, pass_ctx):
attrs = pass_ctx._attrs
if attrs['ps_mode'] == DistributedMode.GEO:
return globals()['GeoPsProgramBuilder'](pass_ctx)
if len(attrs['local_sparse']) != 0:
return globals()['NuPsProgramBuilder'](pass_ctx)
else:
return globals()['GeoPsProgramBuilder'](pass_ctx)
elif attrs['use_ps_gpu']:
return globals()['GpuPsProgramBuilder'](pass_ctx)
elif attrs['is_heter_ps_mode'] and not attrs['is_fl_ps_mode']:
......
......@@ -118,6 +118,49 @@ class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
return
class NuPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
super(NuPsProgramBuilder, self).__init__(pass_ctx)
if not self.attrs['local_sparse']:
raise ValueError("No local sparse params")
def _build_trainer_programs(self):
add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
self.attrs)
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
append_send_ops_pass = new_pass("append_send_ops_pass",
self.attrs) # fleet->PushDenseVarsAsync
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
delete_extra_optimizer_pass = new_pass("delete_extra_optimizer_pass",
self.attrs)
delete_extra_optimizer_pass.apply([self.attrs['origin_main_program']],
[self.cloned_startup], self.pass_ctx)
fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
append_send_ops_pass = new_pass("append_send_ops_pass",
self.attrs) # communicator->Send
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
self.attrs['origin_main_program'] = self.cloned_main
self.attrs['origin_startup_program'] = self.cloned_startup
if self.launch_barrier and self.launch_barrier_flag:
wait_server_ready(self.server_endpoints)
return
class CpuSyncPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
......
......@@ -87,6 +87,7 @@ class DistributedMode:
HALF_ASYNC = 2
GEO = 3
FL = 4
NU = 5
class TrainerRuntimeConfig(object):
......@@ -187,11 +188,15 @@ def get_lr_ops(program):
return lr_ops
def get_optimize_ops(_program):
def get_optimize_ops(_program, remote_sparse=[]):
block = _program.global_block()
opt_ops = []
for op in block.ops:
if _is_opt_role_op(op):
if len(remote_sparse) > 0 and op.input(
"Param"
)[0] not in remote_sparse: # for fl: only delete remote sparse optimize
continue
# delete clip op from opt_ops when run in Parameter Server mode
if OP_NAME_SCOPE in op.all_attrs() \
and CLIP_OP_NAME_SCOPE in op.attr(OP_NAME_SCOPE):
......@@ -348,7 +353,7 @@ def get_dense_send_context(program,
dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], origin_varnames, trainer_id,
aggregate, False, False, idx, False, False,
id(program))
id(program), [])
send_ctx[grad_name] = dense_ctx
idx += 1
......@@ -371,7 +376,7 @@ def get_dense_send_context(program,
data_norm_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], origin_varnames, trainer_id,
aggregate, False, False, idx, False, True,
id(program))
id(program), [])
send_ctx[grad_name] = data_norm_ctx
idx += 1
else:
......@@ -386,45 +391,49 @@ def get_dense_send_context(program,
dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], [origin_varname], trainer_id,
aggregate, False, False, idx, False, False,
id(program))
id(program), [])
send_ctx[grad_name] = dense_ctx
idx += 1
return idx
def get_geo_trainer_send_context(context):
if context['ps_mode'] != DistributedMode.GEO:
def get_geo_trainer_send_context(attrs):
if attrs['ps_mode'] != DistributedMode.GEO:
raise ValueError("ps mode: {} not matched {}",
format(ps_mode, "get_geo_trainer_send_context"))
send_ctx = {}
trainer_id = get_role_id(context['role_maker'])
origin_programs = context['origin_main_programs']
idx = 0
trainer_id = get_role_id(attrs['role_maker'])
origin_programs = attrs['origin_main_programs']
idx = 0 # table idx
distibuted_varnames = get_sparse_tablenames(origin_programs, True)
for i, program in enumerate(origin_programs):
merged_sparse_pairs = context['merged_sparse_pairs'][i]
merged_sparse_pairs = attrs['merged_sparse_pairs'][i]
for merged in merged_sparse_pairs:
param, grad = merged
grad_name = grad.merged_var.name
param_name = param.merged_var.name
is_distributed = True if param_name in distibuted_varnames else False
if param_name in attrs['remote_sparse']: # for recall/ncf model
continue
is_distributed = True if param_name in distibuted_varnames else False
var = program.global_block().vars[grad.merged_var.name]
var_numel = reduce(lambda x, y: x * y, var.shape[1:])
from paddle.fluid.core import CommContext
print("public get_the_geo_send_context sparse: ", grad_name,
var_numel)
sparse_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], [grad_name], trainer_id, True,
True, is_distributed, idx, False, False,
id(program))
id(program), [])
idx += 1
send_ctx[sparse_ctx.var_name()] = sparse_ctx
if len(send_ctx) == 0:
raise ValueError("GeoSGD require sparse parameters in your net.")
if len(context['tensor_table']) > 0 and context['is_worker']:
name, ctx = _step_ctx(idx, context['role_maker'])
if len(attrs['tensor_table']) > 0 and attrs['is_worker']:
name, ctx = _step_ctx(idx, attrs['role_maker'])
send_ctx[name] = ctx
return send_ctx
......@@ -438,32 +447,33 @@ def _step_ctx(idx, role_maker):
names = [name] * len(endpoints)
from paddle.fluid.core import CommContext
ctx = CommContext(name, names, endpoints, sections, [name], trainer_id,
True, False, False, idx, True, False, -1)
True, False, False, idx, True, False, -1, [])
return name, ctx
def get_the_one_send_context(context,
use_origin_program=False,
split_dense_table=False,
ep_list=None):
def get_the_one_send_context(attrs, split_dense_table=False, ep_list=None):
if ep_list is None:
ep_list = ["127.0.0.1:6071"]
send_ctx = {}
trainer_id = get_role_id(context['role_maker'])
origin_programs = context['origin_main_programs']
trainer_id = get_role_id(attrs['role_maker'])
origin_programs = attrs['origin_main_programs']
print("is_heter_ps_mode? {}".format(split_dense_table))
idx = 0
distibuted_varnames = get_sparse_tablenames(origin_programs, True)
# print("public distibuted_varnames:", distibuted_varnames)
for i, program in enumerate(origin_programs):
merged_sparse_pairs = context['merged_sparse_pairs'][i]
merged_sparse_pairs = attrs['merged_sparse_pairs'][i]
for merged in merged_sparse_pairs:
param, grad = merged
grad_name = grad.merged_var.name
param_name = param.merged_var.name
splited_varname = []
remote_sparse_ids = []
if param_name in attrs['remote_sparse']: # for recall/ncf model
remote_sparse_ids.append(idx)
splited_varname = []
for i in range(len(ep_list)):
splited_varname.append("{}.block{}".format(param_name, i))
......@@ -474,26 +484,26 @@ def get_the_one_send_context(context,
shape = list(var.shape)
shape[0] = 0 if is_distributed else shape[0]
#print("public get_the_one_send_context sparse:", grad_name,
# splited_varname, shape)
if grad_name in send_ctx:
continue
from paddle.fluid.core import CommContext
print("public get_the_one_send_context sparse: ", grad_name,
splited_varname, shape)
sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape,
[grad_name], trainer_id, True, True,
is_distributed, idx, False, False,
id(program))
id(program), remote_sparse_ids)
idx += 1
send_ctx[sparse_ctx.var_name()] = sparse_ctx
for i, program in enumerate(origin_programs):
merged_dense_pairs = context['merged_dense_pairs'][i]
merged_dense_pairs = attrs['merged_dense_pairs'][i]
idx = get_dense_send_context(program, send_ctx, idx, merged_dense_pairs,
trainer_id, split_dense_table)
if len(context['tensor_table']) > 0 and context['is_worker']:
name, ctx = _step_ctx(idx, context['role_maker'])
if len(attrs['tensor_table']) > 0 and attrs['is_worker']:
name, ctx = _step_ctx(idx, attrs['role_maker'])
send_ctx[name] = ctx
return send_ctx
......@@ -1165,17 +1175,12 @@ def insert_communicate_op(orign_program,
return entrance_var
def get_the_one_recv_context(context,
is_dense=True,
split_dense_table=False,
use_origin_program=False):
def get_the_one_recv_context(context, is_dense=True, split_dense_table=False):
recv_id_maps = {}
grad_name_to_param_name = {}
if is_dense:
send_ctx = get_the_one_send_context(
context,
split_dense_table=split_dense_table,
use_origin_program=use_origin_program)
send_ctx = get_the_one_send_context(context,
split_dense_table=split_dense_table)
for idx, (name, ctx) in enumerate(send_ctx.items()):
if ctx.is_sparse():
continue
......@@ -1192,7 +1197,6 @@ def get_the_one_recv_context(context,
else:
send_ctx = get_the_one_send_context(context,
split_dense_table=False,
use_origin_program=False,
ep_list=None)
for idx, (name, ctx) in enumerate(send_ctx.items()):
if not ctx.is_sparse():
......@@ -1266,8 +1270,8 @@ def build_var_distributed(context):
context["merged_variable_map"] = {}
for origin_program in origin_programs:
sparse_pairs, dense_pairs = get_param_grads(origin_program)
# print("public build_var_distributed sparse_pairs:", sparse_pairs)
# print("public build_var_distributed dense_pairs:", dense_pairs)
#print("public build_var_distributed sparse_pairs:", sparse_pairs)
#print("public build_var_distributed dense_pairs:", dense_pairs)
origin_for_sparse = []
origin_for_dense = []
merged_sparse_pairs = []
......@@ -1287,7 +1291,7 @@ def build_var_distributed(context):
m_grad = MergedVariable(grad, [grad], [0])
merged_variables_pairs.append((m_param, m_grad))
merged_dense_pairs.append((m_param, m_grad))
# print("public build_var_distributed merged_dense_pairs:",
#print("public build_var_distributed merged_dense_pairs:",
# merged_dense_pairs)
for sparse_pair in origin_for_sparse:
......@@ -1297,7 +1301,7 @@ def build_var_distributed(context):
m_grad = MergedVariable(grad, [grad], [0])
merged_variables_pairs.append((m_param, m_grad))
merged_sparse_pairs.append((m_param, m_grad))
# print("public build_var_distributed merged_sparse_pairs:",
#print("public build_var_distributed merged_sparse_pairs:",
# merged_sparse_pairs)
for merged in merged_variables_pairs:
......@@ -1322,20 +1326,20 @@ def build_var_distributed(context):
context["param_name_to_grad_name"] = param_name_to_grad_name
context["grad_name_to_param_name"] = grad_name_to_param_name
# print("public build_var_distributed origin_sparse_pairs:",
# context["origin_sparse_pairs"])
# print("public build_var_distributed origin_for_dense:",
# context["origin_dense_pairs"])
# print("public build_var_distributed merged_sparse_pairs:",
# context["merged_sparse_pairs"])
# print("public build_var_distributed merged_dense_pairs:",
# context['merged_dense_pairs'])
# print("public build_var_distributed param_name_to_grad_name:",
# param_name_to_grad_name)
# print("public build_var_distributed grad_name_to_param_name:",
# grad_name_to_param_name)
'''
print("public build_var_distributed origin_sparse_pairs:",
context["origin_sparse_pairs"])
print("public build_var_distributed origin_for_dense:",
context["origin_dense_pairs"])
print("public build_var_distributed merged_sparse_pairs:",
context["merged_sparse_pairs"])
print("public build_var_distributed merged_dense_pairs:",
context['merged_dense_pairs'])
print("public build_var_distributed param_name_to_grad_name:",
param_name_to_grad_name)
print("public build_var_distributed grad_name_to_param_name:",
grad_name_to_param_name)
'''
def _is_opt_role_op(op):
......
......@@ -208,7 +208,7 @@ class Communicator(object):
self.communicator_.push_sparse_param(var_name, table_id, scope)
class FLCommunicator(Communicator):
class FLCommunicator(Communicator): ## only for coordinator
def __init__(self, ps_hosts, kwargs=None):
mode = None
......
......@@ -402,7 +402,7 @@ class CompileTimeStrategy(object):
trainer_id = self.get_role_id()
aggregate = True
ctx = CommContext(name, names, eps, sections, origin_varnames,
trainer_id, aggregate, is_sparse, is_distributed)
trainer_id, aggregate, is_sparse, is_distributed, [])
return ctx
def get_trainer_send_context(self):
......@@ -452,7 +452,7 @@ class CompileTimeStrategy(object):
grad_ctx.origin_varnames(),
param_ctx.trainer_id(), param_ctx.aggregate(),
param_ctx.is_sparse(),
param_ctx.is_distributed())
param_ctx.is_distributed(), [])
send_ctx[ctx.var_name()] = ctx
idx += 1
......@@ -579,7 +579,8 @@ class CompileTimeStrategy(object):
sparse_ctx = CommContext(grad_name, [grad_name],
["127.0.0.1:6071"], [var_numel],
[grad_name], trainer_id, True, True,
is_distributed, idx, False, False, -1)
is_distributed, idx, False, False, -1,
[])
idx += 1
send_ctx[sparse_ctx.var_name()] = sparse_ctx
......@@ -618,7 +619,7 @@ class CompileTimeStrategy(object):
dense_ctx = CommContext(grad_name, [grad_name], ["127.0.0.1:6071"],
[var_numel], origin_varnames, trainer_id,
aggregate, False, False, idx, False, False,
-1)
-1, [])
send_ctx[grad_name] = dense_ctx
idx += 1
else:
......@@ -633,7 +634,7 @@ class CompileTimeStrategy(object):
dense_ctx = CommContext(grad_name, [grad_name],
["127.0.0.1:6071"], [var_numel],
[origin_varname], trainer_id, aggregate,
False, False, idx, False, False, -1)
False, False, idx, False, False, -1, [])
send_ctx[grad_name] = dense_ctx
idx += 1
return idx
......@@ -675,7 +676,7 @@ class CompileTimeStrategy(object):
sparse_ctx = CommContext(grad_name, splited_varname, ep_list, shape,
[grad_name], trainer_id, True, True,
is_distributed, idx, False, False, -1)
is_distributed, idx, False, False, -1, [])
idx += 1
send_ctx[sparse_ctx.var_name()] = sparse_ctx
......@@ -753,7 +754,7 @@ class CompileTimeStrategy(object):
sections = [1] * len(endpoints)
names = [name] * len(endpoints)
ctx = CommContext(name, names, endpoints, sections, [name], trainer_id,
True, False, False, idx, True, False, -1)
True, False, False, idx, True, False, -1, [])
return name, ctx
def _create_vars_from_blocklist(self, block_list):
......
......@@ -149,7 +149,7 @@ class TestPsTrainerPass(PsPassTestBase):
self.config['debug_new_minimize'] = '0'
self.config['log_dir'] = ps_log_root_dir + "gpubox_log_old_minimize"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch("gpu-ps")
#self.ps_launch("gpu-ps")
self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = ps_log_root_dir + "gpubox_log_new_minimize"
......
......@@ -199,7 +199,9 @@ def get_user_defined_strategy(config):
"dump_fields_path": config.get("runner.dump_fields_path", ""),
"dump_fields": config.get("runner.dump_fields", []),
"dump_param": config.get("runner.dump_param", []),
"stat_var_names": config.get("stat_var_names", [])
"stat_var_names": config.get("stat_var_names", []),
"local_sparse": config.get("runner.local_sparse", []),
"remote_sparse": config.get("runner.remote_sparse", [])
}
print("strategy:", strategy.trainer_desc_configs)
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import os
os.environ["WITH_DISTRIBUTE"] = "ON"
import unittest
import paddle
import paddle.distributed.fleet.base.role_maker as role_maker
......@@ -64,8 +63,7 @@ class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
with self.assertRaises(ValueError):
optimizer.minimize(avg_cost)
optimizer.minimize(avg_cost)
def test_a_sync_optimizer_pserver(self):
os.environ["TRAINING_ROLE"] = "PSERVER"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册