未验证 提交 34369944 编写于 作者: J jiaqi 提交者: GitHub

support config file, cvm, load, save, shrink (#17319)

* support config file, cvm, load, save, shrink
test=develop

* fix error of worker_num & add table.compress_in_save
test=develop

* fix code style
test=develop

* fix save model bug
test=develop
上级 66d51206
......@@ -269,8 +269,8 @@ class MPISymetricRoleMaker(MPIRoleMaker):
"""
if not self._role_is_generated:
# TODO(guru4elephant): only allow to be called once
self._worker_endpoints = self._get_ips()
self._server_endpoints = self._get_ips()
self._worker_endpoints = self._get_ips()[1::2]
self._server_endpoints = self._get_ips()[::2]
if 0 == self._get_rank() % self._proc_per_node % 2:
self._node_type = 0
......
......@@ -106,14 +106,33 @@ class PSLib(Fleet):
raise NameError(
"You should run DistributedOptimizer.minimize() first")
def init_server(self, model_dir=None):
pass
def init_server(self, model_dir=None, **kwargs):
"""
init_server() will be called by user. It will load model from model_dir.
Args:
model_dir(str): load model path, can be local or hdfs/afs path.
kwargs: user-defined attributes, currently support following:
model(int): load model mode.
0 is for load whole model,
1 is for load delta model (load diff),
default is 0.
Example:
>>> fleet.init_server("/you/path/to/model", mode = 0)
"""
mode = kwargs.get("mode", 0)
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.load_model(model_dir, mode)
self._role_maker._barrier_worker()
def run_server(self):
"""
init_pserver(): will be called by user. When a user knows current process is_worker(), he/she
should call init_pserver() to initialize global information about parameter server
"""
"""
if self._opt_info:
if "fleet_desc" in self._opt_info:
self._dist_desc_str = text_format.MessageToString(
......@@ -162,7 +181,7 @@ class PSLib(Fleet):
self._role_maker._barrier_all()
self._role_maker._finalize()
def distributed_optimizer(self, optimizer, strategy=None):
def distributed_optimizer(self, optimizer, strategy={}):
self._optimizer = DownpourOptimizer(optimizer, strategy)
return self._optimizer
......@@ -177,8 +196,81 @@ class PSLib(Fleet):
"""
self._fleet_ptr.save_model(dirname)
def save_persistables(self, dirname, main_program=None):
self._fleet_ptr.save_model(dirname)
def save_persistables(self, dirname, main_program=None, **kwargs):
"""
save presistable parameters,
when using fleet, it will save sparse and dense feature
Args:
dirname(str): save path. It can be hdfs/afs path or local path
main_program(Program): fluid program, default None
kwargs: use define property, current support following
mode(int): 0 means save all pserver model,
1 means save delta pserver model (save diff),
2 means save xbox base,
3 means save batch model.
Example:
>>> fleet.save_persistables(dirname="/you/path/to/model", mode = 0)
"""
mode = kwargs.get("mode", 0)
self._fleet_ptr.client_flush()
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.save_model(dirname, mode)
self._role_maker._barrier_worker()
def shrink_sparse_table(self):
"""
shrink cvm of all sparse embedding in pserver, the decay rate
is defined as "show_click_decay_rate" in fleet_desc.prototxt
Example:
>>> fleet.shrink_sparse_table()
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
for i in self._opt_info["fleet_desc"].trainer_param.sparse_table:
self._fleet_ptr.shrink_sparse_table(i.table_id)
self._role_maker._barrier_worker()
def shrink_dense_table(self, decay, scope=None, table_id=None):
"""
shrink all dense params in pserver by multiplying by decay
Args:
decay(float): the decay rate, usually range in (0, 1)
scope(Scope): Scope object, default is fluid.global_scope()
table_id(int): table id of shrinking dense table. None means shrink all,
you should specify it when using multiple scopes,
default is None.
Example:
>>> fleet.shrink_dense_table(0.98, myscope1, 1)
>>> fleet.shrink_dense_table(0.98, myscope1, 2)
>>> fleet.shrink_dense_table(0.98, myscope2, 3)
"""
if scope is None:
scope = fluid.global_scope()
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
for i in self._opt_info["fleet_desc"].trainer_param.dense_table:
if table_id is not None and table_id != i.table_id:
continue
var_list = [var for var in i.dense_variable_name]
skip = False
for var in var_list:
if scope.find_var(var) is None:
skip = True
break
if skip:
continue
self._fleet_ptr.shrink_dense_table(i.table_id, scope, var_list,
decay)
self._role_maker._barrier_worker()
def _set_opt_info(self, opt_info):
"""
......@@ -273,7 +365,8 @@ class DownpourOptimizer(DistributedOptimizer):
losses,
startup_programs,
parameter_list,
no_grad_set)
no_grad_set,
self._strategy)
fleet._set_opt_info(opt_info)
......
......@@ -62,10 +62,18 @@ class DownpourServer(Server):
Returns:
return None
"""
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_SPARSE_TABLE:
return
else:
raise ValueError("expect table %s type=%s, but actual type=%s" \
%(table_id, pslib.PS_SPARSE_TABLE, table.type))
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourSparseTable"
table.type = pslib.PS_SPARSE_TABLE
table.compress_in_save = True
table.accessor.accessor_class = "DownpourFeatureValueAccessor"
table.accessor.sparse_sgd_param.learning_rate = learning_rate
table.accessor.sparse_sgd_param.initial_g2sum = 3
......@@ -94,10 +102,24 @@ class DownpourServer(Server):
Returns:
return None
"""
fea_dim = 0
for param in filter(lambda x: x.name.find("embedding") == -1,
param_var):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_DENSE_TABLE:
table.accessor.fea_dim = fea_dim
return
else:
raise ValueError("expect table %s type=%s, but actual type=%s" \
%(table_id, pslib.PS_DENSE_TABLE, table.type))
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourDenseTable"
table.type = pslib.PS_DENSE_TABLE
table.compress_in_save = True
table.accessor.accessor_class = "DownpourDenseValueAccessor"
table.accessor.dense_sgd_param.name = "adam"
table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
......@@ -106,10 +128,6 @@ class DownpourServer(Server):
table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8
table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99
table.accessor.dense_sgd_param.naive.learning_rate = 0.0002
fea_dim = 0
for param in filter(lambda x: x.name.find("embedding") == -1,
param_var):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim
def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var):
......@@ -123,17 +141,27 @@ class DownpourServer(Server):
Returns:
return None
"""
fea_dim = 0
for param in filter(lambda x: x.name.find("embedding") == -1,
param_var):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_DENSE_TABLE:
table.accessor.fea_dim = fea_dim
return
else:
raise ValueError("expect table %s type=%s, but actual type=%s" \
%(table_id, pslib.PS_DENSE_TABLE, table.type))
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourDenseTable"
table.type = pslib.PS_DENSE_TABLE
table.compress_in_save = True
table.accessor.accessor_class = "DownpourDenseValueAccessor"
table.accessor.dense_sgd_param.name = "summary"
table.accessor.dense_sgd_param.summary.summary_decay_rate = 0.999999
fea_dim = 0
for param in filter(lambda x: x.name.find("embedding") == -1,
param_var):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim
def get_desc(self):
......@@ -169,6 +197,9 @@ class DownpourWorker(Worker):
Returns:
return None
"""
for table in self._worker.sparse_table:
if table.table_id == table_id:
return
table = self._worker.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in slot_key_vars])
......@@ -187,6 +218,9 @@ class DownpourWorker(Worker):
Returns:
return None
"""
for table in self._worker.dense_table:
if table.table_id == table_id:
return
table = self._worker.dense_table.add()
table.table_id = table_id
table.dense_variable_name.extend(
......
......@@ -52,7 +52,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
losses,
startup_program=None,
parameter_list=None,
no_grad_set=None):
no_grad_set=None,
strategy={}):
"""
DownpounSGD is a distributed optimizer so
that user can call minimize to generate backward
......@@ -63,6 +64,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
strategy(dict): user-defined properties
Returns:
[optimize_ops, grads_and_weights]
"""
......@@ -76,6 +78,15 @@ class DistributedAdam(DistributedOptimizerImplBase):
ps_param = pslib.PSParameter()
server = DownpourServer()
worker = DownpourWorker(self._window)
# if user specify a fleet_desc.prototxt file, then load the file
# instead of creating default fleet_desc.prototxt.
# user can specify server_param or trainer_param or fs_client_param.
if strategy.get("fleet_desc_file") is not None:
fleet_desc_file = strategy["fleet_desc_file"]
with open(fleet_desc_file) as f:
text_format.Merge(f.read(), ps_param)
server.get_desc().CopyFrom(ps_param.server_param)
worker.get_desc().CopyFrom(ps_param.trainer_param)
sparse_table_index = 0
server.add_sparse_table(sparse_table_index, self._learning_rate,
prefetch_slots, prefetch_slots_emb)
......@@ -140,7 +151,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
if len(ps_param.trainer_param.skip_op) == 0:
ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
opt_info = {}
opt_info["program_configs"] = program_configs
......@@ -149,6 +161,7 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["optimizer"] = "DownpourSGD"
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
for loss in losses:
loss.block.program._fleet_opt = opt_info
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册