From 8177ece548259a25c2b1fe738b02c51d6c2b4f7d Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 25 Feb 2021 09:32:35 +0800 Subject: [PATCH] fix entry (#31079) (#31182) * fix entry * fix distributed lookup table fuse case * fix entry bug at first time * move entry from paddle.fluid -> paddle.distributed * fix ut with paddle.enable_static() Co-authored-by: malin10 Co-authored-by: malin10 --- paddle/fluid/distributed/ps.proto | 5 +- .../distributed/table/common_sparse_table.cc | 7 +- .../table/depends/large_scale_kv.h | 16 +- .../test/brpc_service_sparse_sgd_test.cc | 1 + python/paddle/distributed/__init__.py | 15 ++ python/paddle/distributed/entry_attr.py | 139 ++++++++++++++++++ .../distributed/fleet/runtime/the_one_ps.py | 48 ++++-- python/paddle/fluid/contrib/layers/nn.py | 11 +- python/paddle/fluid/entry_attr.py | 6 +- .../fleet/parameter_server/ir/trainer_pass.py | 17 ++- .../fluid/tests/unittests/test_entry_attr.py | 17 ++- .../fluid/tests/unittests/test_entry_attr2.py | 3 + 12 files changed, 242 insertions(+), 43 deletions(-) create mode 100644 python/paddle/distributed/entry_attr.py diff --git a/paddle/fluid/distributed/ps.proto b/paddle/fluid/distributed/ps.proto index 2570d3eaf03..862ae4a504d 100644 --- a/paddle/fluid/distributed/ps.proto +++ b/paddle/fluid/distributed/ps.proto @@ -140,8 +140,9 @@ message CommonAccessorParameter { repeated string params = 4; repeated uint32 dims = 5; repeated string initializers = 6; - optional int32 trainer_num = 7; - optional bool sync = 8; + optional string entry = 7; + optional int32 trainer_num = 8; + optional bool sync = 9; } message TableAccessorSaveParameter { diff --git a/paddle/fluid/distributed/table/common_sparse_table.cc b/paddle/fluid/distributed/table/common_sparse_table.cc index 3ab3a398cdb..56372ac8440 100644 --- a/paddle/fluid/distributed/table/common_sparse_table.cc +++ b/paddle/fluid/distributed/table/common_sparse_table.cc @@ -238,12 +238,13 @@ int32_t CommonSparseTable::initialize() { int32_t CommonSparseTable::initialize_recorder() { return 0; } int32_t CommonSparseTable::initialize_value() { + auto common = _config.common(); shard_values_.reserve(task_pool_size_); for (int x = 0; x < task_pool_size_; ++x) { - auto shard = - std::make_shared(value_names_, value_dims_, value_offsets_, - value_idx_, initializer_attrs_, "none"); + auto shard = std::make_shared( + value_names_, value_dims_, value_offsets_, value_idx_, + initializer_attrs_, common.entry()); shard_values_.emplace_back(shard); } diff --git a/paddle/fluid/distributed/table/depends/large_scale_kv.h b/paddle/fluid/distributed/table/depends/large_scale_kv.h index 5cbf48548fb..37ceb650664 100644 --- a/paddle/fluid/distributed/table/depends/large_scale_kv.h +++ b/paddle/fluid/distributed/table/depends/large_scale_kv.h @@ -71,7 +71,7 @@ inline bool count_entry(std::shared_ptr value, int threshold) { } inline bool probility_entry(std::shared_ptr value, float threshold) { - UniformInitializer uniform = UniformInitializer({"0", "0", "1"}); + UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"}); return uniform.GetValue() >= threshold; } @@ -93,20 +93,20 @@ class ValueBlock { // for Entry { - auto slices = string::split_string(entry_attr, "&"); + auto slices = string::split_string(entry_attr, ":"); if (slices[0] == "none") { entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0); - } else if (slices[0] == "count_filter") { + } else if (slices[0] == "count_filter_entry") { int threshold = std::stoi(slices[1]); entry_func_ = std::bind(&count_entry, std::placeholders::_1, threshold); - } else if (slices[0] == "probability") { + } else if (slices[0] == "probability_entry") { float threshold = std::stof(slices[1]); entry_func_ = std::bind(&probility_entry, std::placeholders::_1, threshold); } else { PADDLE_THROW(platform::errors::InvalidArgument( - "Not supported Entry Type : %s, Only support [count_filter, " - "probability]", + "Not supported Entry Type : %s, Only support [CountFilterEntry, " + "ProbabilityEntry]", slices[0])); } } @@ -179,10 +179,12 @@ class ValueBlock { initializers_[x]->GetValue(value->data_.data() + value_offsets_[x], value_dims_[x]); } + value->need_save_ = true; } + } else { + value->need_save_ = true; } - value->need_save_ = true; return; } diff --git a/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc b/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc index ddeb7b50232..faa3aed9ce7 100644 --- a/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc +++ b/paddle/fluid/distributed/test/brpc_service_sparse_sgd_test.cc @@ -78,6 +78,7 @@ void GetDownpourSparseTableProto( common_proto->set_table_name("MergedDense"); common_proto->set_trainer_num(1); common_proto->set_sync(false); + common_proto->set_entry("none"); common_proto->add_params("Param"); common_proto->add_dims(10); common_proto->add_initializers("uniform_random&0&-1.0&1.0"); diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index 155037030b5..c882e94d2ba 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -25,6 +25,9 @@ from paddle.distributed.fleet.dataset import * from . import collective from .collective import * +from .entry_attr import ProbabilityEntry +from .entry_attr import CountFilterEntry + # start multiprocess apis __all__ = ["spawn"] @@ -38,5 +41,17 @@ __all__ += [ "QueueDataset", ] +# dataset reader +__all__ += [ + "InMemoryDataset", + "QueueDataset", +] + +# entry for embedding +__all__ += [ + "ProbabilityEntry", + "CountFilterEntry", +] + # collective apis __all__ += collective.__all__ diff --git a/python/paddle/distributed/entry_attr.py b/python/paddle/distributed/entry_attr.py new file mode 100644 index 00000000000..dbd899952af --- /dev/null +++ b/python/paddle/distributed/entry_attr.py @@ -0,0 +1,139 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +__all__ = ['ProbabilityEntry', 'CountFilterEntry'] + + +class EntryAttr(object): + """ + Entry Config for paddle.static.nn.sparse_embedding with Parameter Server. + + Examples: + .. code-block:: python + + import paddle + + sparse_feature_dim = 1024 + embedding_size = 64 + + entry = paddle.distributed.ProbabilityEntry(0.1) + + input = paddle.static.data(name='ins', shape=[1], dtype='int64') + + emb = paddle.static.nn.sparse_embedding(( + input=input, + size=[sparse_feature_dim, embedding_size], + is_test=False, + entry=entry, + param_attr=paddle.ParamAttr(name="SparseFeatFactors", + initializer=paddle.nn.initializer.Uniform())) + + """ + + def __init__(self): + self._name = None + + def _to_attr(self): + """ + Returns the attributes of this parameter. + + Returns: + Parameter attributes(map): The attributes of this parameter. + """ + raise NotImplementedError("EntryAttr is base class") + + +class ProbabilityEntry(EntryAttr): + """ + Examples: + .. code-block:: python + + import paddle + + sparse_feature_dim = 1024 + embedding_size = 64 + + entry = paddle.distributed.ProbabilityEntry(0.1) + + input = paddle.static.data(name='ins', shape=[1], dtype='int64') + + emb = paddle.static.nn.sparse_embedding(( + input=input, + size=[sparse_feature_dim, embedding_size], + is_test=False, + entry=entry, + param_attr=paddle.ParamAttr(name="SparseFeatFactors", + initializer=paddle.nn.initializer.Uniform())) + + + """ + + def __init__(self, probability): + super(EntryAttr, self).__init__() + + if not isinstance(probability, float): + raise ValueError("probability must be a float in (0,1)") + + if probability <= 0 or probability >= 1: + raise ValueError("probability must be a float in (0,1)") + + self._name = "probability_entry" + self._probability = probability + + def _to_attr(self): + return ":".join([self._name, str(self._probability)]) + + +class CountFilterEntry(EntryAttr): + """ + Examples: + .. code-block:: python + + import paddle + + sparse_feature_dim = 1024 + embedding_size = 64 + + entry = paddle.distributed.CountFilterEntry(10) + + input = paddle.static.data(name='ins', shape=[1], dtype='int64') + + emb = paddle.static.nn.sparse_embedding(( + input=input, + size=[sparse_feature_dim, embedding_size], + is_test=False, + entry=entry, + param_attr=paddle.ParamAttr(name="SparseFeatFactors", + initializer=paddle.nn.initializer.Uniform())) + + """ + + def __init__(self, count_filter): + super(EntryAttr, self).__init__() + + if not isinstance(count_filter, int): + raise ValueError( + "count_filter must be a valid integer greater than 0") + + if count_filter < 0: + raise ValueError( + "count_filter must be a valid integer greater or equal than 0") + + self._name = "count_filter_entry" + self._count_filter = count_filter + + def _to_attr(self): + return ":".join([self._name, str(self._count_filter)]) diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index 91a70bd3f39..abec4710f5d 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -58,6 +58,7 @@ class CommonAccessor: def __init__(self): self.accessor_class = "" self.table_name = None + self.entry = None self.attrs = [] self.params = [] self.dims = [] @@ -93,6 +94,24 @@ class CommonAccessor: self.opt_input_map = opt_input_map self.opt_init_map = opt_init_map + def parse_entry(self, varname, o_main_program): + from paddle.fluid.incubate.fleet.parameter_server.ir.public import is_distributed_sparse_op + from paddle.fluid.incubate.fleet.parameter_server.ir.public import is_sparse_op + + for op in o_main_program.global_block().ops: + if not is_distributed_sparse_op(op) and not is_sparse_op(op): + continue + + param_name = op.input("W")[0] + + if param_name == varname and op.type == "lookup_table": + self.entry = op.attr('entry') + break + + if param_name == varname and op.type == "lookup_table_v2": + self.entry = "none" + break + def get_shard(self, total_dim, shard_num, pserver_id): # remainder = total_dim % shard_num blocksize = int(total_dim / shard_num + 1) @@ -188,6 +207,8 @@ class CommonAccessor: if self.table_name: attrs += "table_name: \"{}\" ".format(self.table_name) + if self.entry: + attrs += "entry: \"{}\" ".format(self.entry) attrs += "trainer_num: {} ".format(self.trainer_num) attrs += "sync: {} ".format(self.sync) @@ -655,36 +676,31 @@ class TheOnePSRuntime(RuntimeBase): use_origin_program=True, split_dense_table=self.role_maker. _is_heter_parameter_server_mode) + tables = [] for idx, (name, ctx) in enumerate(send_ctx.items()): + if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1: + continue + table = Table() table.id = ctx.table_id() - - if ctx.is_tensor_table(): - continue + common = CommonAccessor() if ctx.is_sparse(): - if len(ctx.origin_varnames()) < 1: - continue table.type = "PS_SPARSE_TABLE" + table.shard_num = 256 if self.compiled_strategy.is_geo_mode(): table.table_class = "SparseGeoTable" else: table.table_class = "CommonSparseTable" - table.shard_num = 256 - else: - if len(ctx.origin_varnames()) < 1: - continue - table.type = "PS_DENSE_TABLE" - table.table_class = "CommonDenseTable" - table.shard_num = 256 - common = CommonAccessor() - if ctx.is_sparse(): common.table_name = self.compiled_strategy.grad_name_to_param_name[ ctx.origin_varnames()[0]] else: + table.type = "PS_DENSE_TABLE" + table.table_class = "CommonDenseTable" + table.shard_num = 256 common.table_name = "MergedDense" common.parse_by_optimizer(ctx.origin_varnames()[0], @@ -693,6 +709,10 @@ class TheOnePSRuntime(RuntimeBase): else ctx.sections()[0], self.compiled_strategy) + if ctx.is_sparse(): + common.parse_entry(common.table_name, + self.origin_main_program) + if is_sync: common.sync = "true" else: diff --git a/python/paddle/fluid/contrib/layers/nn.py b/python/paddle/fluid/contrib/layers/nn.py index acb57fc2456..8c48033fc46 100644 --- a/python/paddle/fluid/contrib/layers/nn.py +++ b/python/paddle/fluid/contrib/layers/nn.py @@ -46,7 +46,6 @@ from paddle.fluid.data_feeder import check_variable_and_dtype, check_type, check from paddle.fluid import core from paddle.fluid.param_attr import ParamAttr -from paddle.fluid.entry_attr import ProbabilityEntry, CountFilterEntry from paddle.fluid.framework import Variable, convert_np_dtype_to_dtype_ from paddle.fluid.layers import slice, reshape @@ -993,11 +992,13 @@ def sparse_embedding(input, entry_str = "none" if entry is not None: - if not isinstance(entry, ProbabilityEntry) and not isinstance( - entry, CountFilterEntry): + if entry.__class__.__name__ not in [ + "ProbabilityEntry", "CountFilterEntry" + ]: raise ValueError( - "entry must be instance in [ProbabilityEntry, CountFilterEntry]") - entry_str = entry.to_attr() + "entry must be instance in [paddle.distributed.ProbabilityEntry, paddle.distributed.CountFilterEntry]" + ) + entry_str = entry._to_attr() helper.append_op( type='lookup_table', diff --git a/python/paddle/fluid/entry_attr.py b/python/paddle/fluid/entry_attr.py index c0999765488..c0d45432c57 100644 --- a/python/paddle/fluid/entry_attr.py +++ b/python/paddle/fluid/entry_attr.py @@ -28,7 +28,7 @@ class EntryAttr(object): def __init__(self): self._name = None - def to_attr(self): + def _to_attr(self): """ Returns the attributes of this parameter. @@ -51,7 +51,7 @@ class ProbabilityEntry(EntryAttr): self._name = "probability_entry" self._probability = probability - def to_attr(self): + def _to_attr(self): return ":".join([self._name, str(self._probability)]) @@ -70,5 +70,5 @@ class CountFilterEntry(EntryAttr): self._name = "count_filter_entry" self._count_filter = count_filter - def to_attr(self): + def _to_attr(self): return ":".join([self._name, str(self._count_filter)]) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 53fb86a9f5a..2292d4c0a4d 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -172,8 +172,21 @@ def distributed_ops_pass(program, config): "lookup_table_version": op_type }) else: - raise ValueError( - "something wrong with Fleet, submit a issue is recommended") + for i in range(len(inputs_idxs)): + distributed_idx = op_idxs[i] + 1 + + program.global_block()._insert_op( + index=distributed_idx, + type="distributed_lookup_table", + inputs={"Ids": [inputs[i]], + 'W': w}, + outputs={"Outputs": [outputs[i]]}, + attrs={ + "is_distributed": is_distributed, + "padding_idx": padding_idx, + "table_id": table_id, + "lookup_table_version": op_type + }) pull_sparse_ops = _get_pull_sparse_ops(program) _pull_sparse_fuse(program, pull_sparse_ops) diff --git a/python/paddle/fluid/tests/unittests/test_entry_attr.py b/python/paddle/fluid/tests/unittests/test_entry_attr.py index 918f6eab29b..efcad103dee 100644 --- a/python/paddle/fluid/tests/unittests/test_entry_attr.py +++ b/python/paddle/fluid/tests/unittests/test_entry_attr.py @@ -14,21 +14,24 @@ from __future__ import print_function +import paddle +paddle.enable_static() + import unittest import paddle.fluid as fluid -from paddle.fluid.entry_attr import ProbabilityEntry, CountFilterEntry +from paddle.distributed import ProbabilityEntry, CountFilterEntry class EntryAttrChecks(unittest.TestCase): def base(self): with self.assertRaises(NotImplementedError): - import paddle.fluid.entry_attr as entry - base = entry.EntryAttr() - base.to_attr() + from paddle.distributed.entry_attr import EntryAttr + base = EntryAttr() + base._to_attr() def probability_entry(self): prob = ProbabilityEntry(0.5) - ss = prob.to_attr() + ss = prob._to_attr() self.assertEqual("probability_entry:0.5", ss) with self.assertRaises(ValueError): @@ -39,7 +42,7 @@ class EntryAttrChecks(unittest.TestCase): def countfilter_entry(self): counter = CountFilterEntry(20) - ss = counter.to_attr() + ss = counter._to_attr() self.assertEqual("count_filter_entry:20", ss) with self.assertRaises(ValueError): @@ -61,7 +64,7 @@ class EntryAttrChecks(unittest.TestCase): lod_level=1, append_batch_size=False) prob = ProbabilityEntry(0.5) - emb = fluid.contrib.layers.sparse_embedding( + emb = paddle.static.nn.sparse_embedding( input=input, size=[100, 10], is_test=False, diff --git a/python/paddle/fluid/tests/unittests/test_entry_attr2.py b/python/paddle/fluid/tests/unittests/test_entry_attr2.py index 48cdfc191cf..96301c4a878 100644 --- a/python/paddle/fluid/tests/unittests/test_entry_attr2.py +++ b/python/paddle/fluid/tests/unittests/test_entry_attr2.py @@ -14,6 +14,9 @@ from __future__ import print_function +import paddle +paddle.enable_static() + import unittest import paddle.fluid as fluid from paddle.fluid.framework import default_main_program -- GitLab