未验证 提交 15ce8e21 编写于 作者: X xujiaqi01 提交者: GitHub

add unit accessor (#23703)

* add unit accessor in fleet, support DownpourUnitAccessor
* test=develop
上级 6b4a51ba
......@@ -81,7 +81,12 @@ class DownpourServer(Server):
'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \
'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \
'sparse_cache_file_num', 'sparse_beta1_decay_rate', 'sparse_beta2_decay_rate', \
'sparse_ada_epsilon', 'sparse_optimizer', 'sparse_ssd_unseenday_threshold']
'sparse_ada_epsilon', 'sparse_optimizer', 'sparse_ssd_unseenday_threshold', \
'embed_sparse_optimizer', 'embed_sparse_learning_rate', 'embed_sparse_weight_bounds', \
'embed_sparse_initial_range', 'embed_sparse_initial_g2sum', 'embed_sparse_beta1_decay_rate', \
'embed_sparse_beta2_decay_rate', 'embedx_sparse_optimizer', 'embedx_sparse_learning_rate', \
'embedx_sparse_weight_bounds', 'embedx_sparse_initial_range', 'embedx_sparse_initial_g2sum', \
'embedx_sparse_beta1_decay_rate', 'embedx_sparse_beta2_decay_rate']
for key in strategy:
if key not in support_sparse_key_list:
......@@ -113,10 +118,12 @@ class DownpourServer(Server):
# DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info
# DownpourSparseValueAccessor : for general task, has embedding and sgd info
# DownpourCtrDoubleAccessor : for ctr task, which show clk are in double
# DownpourUnitAccessor : for ctr task, has cvm, slot, embedding and sgd info
support_accessor_class = [
'DownpourFeatureValueAccessor', 'DownpourCtrAccessor',
'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor'
'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor',
'DownpourUnitAccessor'
]
if strategy.get('sparse_accessor_class') is not None:
accessor_class = strategy.get('sparse_accessor_class')
......@@ -130,7 +137,9 @@ class DownpourServer(Server):
table.accessor.accessor_class = accessor_class
if accessor_class == 'DownpourFeatureValueAccessor' or accessor_class == 'DownpourCtrAccessor' or accessor_class == 'DownpourCtrDoubleAccessor':
if accessor_class == 'DownpourFeatureValueAccessor' \
or accessor_class == 'DownpourCtrAccessor' \
or accessor_class == 'DownpourCtrDoubleAccessor':
table.accessor.sparse_sgd_param.learning_rate = strategy.get(
'sparse_learning_rate', 0.05)
table.accessor.sparse_sgd_param.initial_g2sum = strategy.get(
......@@ -245,6 +254,12 @@ class DownpourServer(Server):
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
elif accessor_class == 'DownpourUnitAccessor':
self.add_sparse_table_common_config(table, strategy)
self.add_sparse_optimizer(table.accessor.embed_sgd_param,
strategy, "embed_")
self.add_sparse_optimizer(table.accessor.embedx_sgd_param,
strategy, "embedx_")
def add_dense_table(self, table_id, param_var, grad_var, strategy,
sparse_table_names):
......@@ -364,6 +379,77 @@ class DownpourServer(Server):
'datanorm_decay_rate', 0.999999)
table.accessor.fea_dim = fea_dim
def add_sparse_optimizer(self, sgd, strategy, prefix):
optimizer_name = strategy.get(prefix + "sparse_optimizer", "adam")
sgd.name = optimizer_name
if optimizer_name == "naive":
sgd.naive.learning_rate = \
strategy.get(prefix + 'sparse_learning_rate', 0.05)
sgd.naive.initial_range = \
strategy.get(prefix + 'sparse_initial_range', 1e-4)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.naive.weight_bounds.extend(bounds)
elif optimizer_name == "adagrad":
sgd.adagrad.learning_rate = \
strategy.get(prefix + 'sparse_learning_rate', 0.05)
sgd.adagrad.initial_range = \
strategy.get(prefix + 'sparse_initial_range', 1e-4)
sgd.adagrad.initial_g2sum = strategy.get(
prefix + 'sparse_initial_g2sum', 3)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adagrad.weight_bounds.extend(bounds)
elif optimizer_name == "adam":
sgd.adam.learning_rate = \
strategy.get(prefix + 'sparse_learning_rate', 0.001)
sgd.adam.initial_range = \
strategy.get(prefix + 'sparse_initial_range', 1e-4)
sgd.adam.beta1_decay_rate = strategy.get(
prefix + 'sparse_beta1_decay_rate', 0.9)
sgd.adam.beta2_decay_rate = strategy.get(
prefix + 'sparse_beta2_decay_rate', 0.999)
sgd.adam.ada_epsilon = strategy.get(prefix + 'sparse_ada_epsilon',
1e-8)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adam.weight_bounds.extend(bounds)
def add_sparse_table_common_config(self, table, strategy):
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.embedx_threshold = strategy.get(
'sparse_embedx_threshold', 10)
table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3
table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get(
'sparse_nonclk_coeff', 0.1)
table.accessor.downpour_accessor_param.click_coeff = strategy.get(
'sparse_click_coeff', 1)
table.accessor.downpour_accessor_param.base_threshold = strategy.get(
'sparse_base_threshold', 1.5)
table.accessor.downpour_accessor_param.delta_threshold = strategy.get(
'sparse_delta_threshold', 0.25)
table.accessor.downpour_accessor_param.delta_keep_days = strategy.get(
'sparse_delta_keep_days', 16)
table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get(
'sparse_delete_after_unseen_days', 30)
table.accessor.downpour_accessor_param.show_click_decay_rate = strategy.get(
'sparse_show_click_decay_rate', 0.98)
table.accessor.downpour_accessor_param.delete_threshold = strategy.get(
'sparse_delete_threshold', 0.8)
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)")
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)")
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
def get_desc(self):
"""
Return downpour server program_desc
......
......@@ -531,7 +531,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
opt_info["dump_param"] = strategy.get("dump_param", [])
if server._server.downpour_server_param.downpour_table_param[
0].accessor.accessor_class in [
"DownpourCtrAccessor", "DownpourCtrDoubleAccessor"
"DownpourCtrAccessor", "DownpourCtrDoubleAccessor",
"DownpourUnitAccessor"
]:
opt_info["dump_slot"] = True
elif server._server.downpour_server_param.downpour_table_param[
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test fleet."""
from __future__ import print_function
import os
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestFleet1(unittest.TestCase):
"""
Test cases for fleet minimize.
"""
def setUp(self):
"""Set up, set envs."""
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
try:
import netifaces
except:
print("warning: no netifaces, skip test_pslib_1")
return
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002"
os.environ["PADDLE_TRAINER_ID"] = "0"
role_maker = GeneralRoleMaker()
role_maker.generate_role()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
fleet.init(role_maker)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
show = fluid.layers.data(name="show", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
emb = fluid.layers.embedding(input=show, size=[1, 1], \
is_sparse=True, is_distributed=True, \
param_attr=fluid.ParamAttr(name="embedding"))
fc = fluid.layers.fc(input=emb, size=1, act=None)
label = fluid.layers.data(name="click", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
label_cast = fluid.layers.cast(label, dtype='float32')
cost = fluid.layers.log_loss(fc, label_cast)
strategy = {}
strategy["embedding"] = {}
strategy["embedding"]["sparse_accessor_class"] = "DownpourUnitAccessor"
strategy["embedding"]["embed_sparse_optimizer"] = "naive"
try:
adam1 = fluid.optimizer.Adam(learning_rate=0.000005)
adam1 = fleet.distributed_optimizer(adam1, strategy=strategy)
adam1.minimize([cost], [scope])
strategy["embedding"]["embed_sparse_optimizer"] = "adagrad"
adam2 = fluid.optimizer.Adam(learning_rate=0.000005)
adam2 = fleet.distributed_optimizer(adam2, strategy=strategy)
adam2.minimize([cost], [scope])
strategy["embedding"]["embed_sparse_optimizer"] = "adam"
adam3 = fluid.optimizer.Adam(learning_rate=0.000005)
adam3 = fleet.distributed_optimizer(adam3, strategy=strategy)
adam3.minimize([cost], [scope])
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册