From 1dbc863202f1229ac9f630586a2ccf785c74aee4 Mon Sep 17 00:00:00 2001 From: wangguanqun Date: Mon, 17 Jan 2022 13:20:01 +0800 Subject: [PATCH] fix benchmark in paddlerec (#38278) --- paddle/fluid/distributed/ps.proto | 2 + .../distributed/service/brpc_ps_client.cc | 4 +- .../fluid/distributed/table/ctr_accessor.cc | 3 +- .../distributed/table/memory_sparse_table.cc | 3 +- .../distributed/fleet/runtime/the_one_ps.py | 46 +++++++++++-------- 5 files changed, 35 insertions(+), 23 deletions(-) diff --git a/paddle/fluid/distributed/ps.proto b/paddle/fluid/distributed/ps.proto index 4483f960eb1..0ae87812bce 100644 --- a/paddle/fluid/distributed/ps.proto +++ b/paddle/fluid/distributed/ps.proto @@ -172,6 +172,8 @@ message CommonAccessorParameter { optional string entry = 7; optional int32 trainer_num = 8; optional bool sync = 9; + optional uint32 table_num = 10; + optional uint32 table_dim = 11; } message TableAccessorSaveParameter { diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index a0a09b14dba..db1dd2ced84 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -1071,8 +1071,8 @@ int32_t BrpcPsClient::recv_and_save_table(const uint64_t table_id, for (size_t i = 0; i < worker_param.downpour_table_param_size(); ++i) { if (worker_param.downpour_table_param(i).table_id() == table_id) { var_name = worker_param.downpour_table_param(i).common().table_name(); - var_num = worker_param.downpour_table_param(i).accessor().fea_dim(); - var_shape = worker_param.downpour_table_param(i).accessor().embedx_dim(); + var_num = worker_param.downpour_table_param(i).common().table_num(); + var_shape = worker_param.downpour_table_param(i).common().table_dim(); break; } } diff --git a/paddle/fluid/distributed/table/ctr_accessor.cc b/paddle/fluid/distributed/table/ctr_accessor.cc index 68bd6eb9f27..23144f39ade 100644 --- a/paddle/fluid/distributed/table/ctr_accessor.cc +++ b/paddle/fluid/distributed/table/ctr_accessor.cc @@ -305,7 +305,8 @@ std::string CtrCommonAccessor::parse_to_string(const float* v, int param) { auto show = common_feature_value.show(const_cast(v)); auto click = common_feature_value.click(const_cast(v)); auto score = show_click_score(show, click); - if (score >= _config.embedx_threshold()) { + if (score >= _config.embedx_threshold() && + param > common_feature_value.embedx_w_index()) { for (auto i = common_feature_value.embedx_w_index(); i < common_feature_value.dim(); ++i) { os << " " << v[i]; diff --git a/paddle/fluid/distributed/table/memory_sparse_table.cc b/paddle/fluid/distributed/table/memory_sparse_table.cc index 7501207abe0..086ddcafeb4 100644 --- a/paddle/fluid/distributed/table/memory_sparse_table.cc +++ b/paddle/fluid/distributed/table/memory_sparse_table.cc @@ -27,7 +27,7 @@ namespace paddle { namespace distributed { // TODO(zhaocaibei123): configure -bool FLAGS_pserver_create_value_when_push = false; +bool FLAGS_pserver_create_value_when_push = true; int FLAGS_pserver_table_save_max_retry = 3; bool FLAGS_pserver_enable_create_feasign_randomly = false; @@ -494,7 +494,6 @@ int32_t MemorySparseTable::push_sparse(const uint64_t* keys, values + push_data_idx * update_value_col; auto itr = local_shard.find(key); if (itr == local_shard.end()) { - VLOG(0) << "sparse table push_sparse: " << key << "not found!"; if (FLAGS_pserver_enable_create_feasign_randomly && !_value_accesor->create_value(1, update_data)) { continue; diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index 1240e1492a7..c561c250678 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -171,6 +171,8 @@ class CommonAccessor: self.dims = [] self.trainer_num = 0 self.sync = "false" + self.table_num = None + self.table_dim = None self.initializers = [] self.opt_input_map = {} self.opt_attr_map = {} @@ -256,7 +258,7 @@ class CommonAccessor: break return attr_str - def parse_by_optimizer(self, grad_name, is_sparse, total_dims, + def parse_by_optimizer(self, grad_name, is_sparse, size, single_dim, compiled_strategy, adam_d2sum): from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_optimize_ops param_name = compiled_strategy.grad_name_to_param_name[grad_name] @@ -281,6 +283,8 @@ class CommonAccessor: initializers = [] self.trainer_num = compiled_strategy.get_trainers() + self.table_num = size + self.table_dim = single_dim if oop.type != 'adam' and adam_d2sum == True: print('optimization algorithm is not adam, set adam_d2sum False') @@ -294,7 +298,7 @@ class CommonAccessor: param_varnames = self.opt_input_map["naive_adagrad"] attr_varnames = self.opt_attr_map["naive_adagrad"] self.accessor_class = "sgd" - elif adam_d2sum: + elif adam_d2sum and not is_sparse: param_varnames = self.opt_input_map["adam_d2sum"] attr_varnames = self.opt_attr_map["adam_d2sum"] self.accessor_class = "adam_d2sum" @@ -309,10 +313,9 @@ class CommonAccessor: #for dims if shape is None: if is_sparse: - shape = total_dims + shape = single_dim else: - shape = self.get_shard(total_dims, pserver_num, - pserver_id) + shape = self.get_shard(size, pserver_num, pserver_id) dims.append(shape) #for initializers @@ -351,9 +354,9 @@ class CommonAccessor: if shape is None: if is_sparse: - shape = total_dims + shape = single_dim else: - shape = self.get_shard(total_dims, pserver_num, + shape = self.get_shard(size, pserver_num, pserver_id) dims.append(shape) @@ -382,6 +385,10 @@ class CommonAccessor: attrs += "entry: \"{}\" ".format(self.entry) attrs += "trainer_num: {} ".format(self.trainer_num) attrs += "sync: {} ".format(self.sync) + if self.table_num: + attrs += "table_num: {} ".format(self.table_num) + if self.table_dim: + attrs += "table_dim: {} ".format(self.table_dim) for param in self.params: attrs += "params: \"{}\" ".format(param) @@ -451,10 +458,7 @@ class Table: accessor_str = accessor_str.format( conv_indent(indent), self.accessor_proto, conv_indent(indent)) attrs += accessor_str + "\n" - return table_str.format( - conv_indent(indent), attrs, conv_indent(indent)) - - if self.accessor is not None: + elif self.accessor is not None: attrs += self.accessor.to_string(indent) attrs += "\n" @@ -988,8 +992,9 @@ class TheOnePSRuntime(RuntimeBase): adam_d2sum = self.context["user_defined_strategy"].adam_d2sum common.parse_by_optimizer(ctx.origin_varnames()[0], ctx.is_sparse(), - ctx.sections()[1] if ctx.is_sparse() - else ctx.sections()[0], + ctx.sections()[0], + ctx.sections()[1] + if ctx.is_sparse() else 1, self.compiled_strategy, adam_d2sum) if ctx.is_sparse(): @@ -1142,17 +1147,25 @@ class TheOnePSRuntime(RuntimeBase): return is_valid + def _get_inference_model_path(self, dirname): + if dirname.startswith("afs:") or dirname.startswith("hdfs:"): + model_path = "./dnn_plugin" + else: + model_path = os.path.join(dirname, "dnn_plugin") + return model_path + def _save_sparse_params(self, executor, dirname, context, main_program, mode): from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_tablenames distributed_varnames = get_sparse_tablenames( self.compiled_strategy.origin_main_program, True) values = [] + model_path = self._get_inference_model_path(dirname) for id, names in context.items(): if names[0] not in distributed_varnames: # only save sparse param to local try: - self._worker.recv_and_save_model(id, dirname) + self._worker.recv_and_save_model(id, model_path) except: pass # save sparse & distributed param on server @@ -1277,10 +1290,7 @@ class TheOnePSRuntime(RuntimeBase): infer_program._copy_dist_param_info_from(program) - if dirname.startswith("afs:") or dirname.startswith("hdfs:"): - model_path = "./dnn_plugin" - else: - model_path = os.path.join(dirname, "dnn_plugin") + model_path = self._get_inference_model_path(dirname) model_basename = "__model__" model_basename = os.path.join(model_path, model_basename) paddle.save(infer_program, model_basename) -- GitLab