未验证 提交 500a8bc2 编写于 作者: W wangzhen38 提交者: GitHub

[RM FLUID] rm ps mode (#50704)

上级 4db8e5c7
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class PSMode:
"""
There are various mode for fleet, each of them is designed for different model.
"""
TRANSPILER = 1
PSLIB = 2
class DistributedMode:
SYNC = 0
ASYNC = 1
HALF_ASYNC = 2
GEO = 3
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
"""Defination of Server and Worker."""
from . import ps_pb2 as pslib
# NOTE: reduce removed in fuctools in python3
from functools import reduce
class Server:
"""
A Server basic class
it's a base class, does not have implementation
"""
def __init__(self):
pass
class Worker:
"""
A Worker basic class.
it's a base class, does not have implementation
"""
def __init__(self):
pass
class DownpourServer(Server):
"""
DownpourServer class is used to generate server program_desc
Args:
server: it is pslib.ServerParameter()
Examples:
server = DownpourServer()
"""
def __init__(self):
self._server = pslib.ServerParameter()
self._server.downpour_server_param.service_param.server_class = (
"DownpourBrpcPsServer"
)
self._server.downpour_server_param.service_param.client_class = (
"DownpourBrpcPsClient"
)
self._server.downpour_server_param.service_param.service_class = (
"DownpourPsService"
)
self._server.downpour_server_param.service_param.start_server_port = 0
self._server.downpour_server_param.service_param.server_thread_num = 12
def add_sparse_table(self, table_id, strategy):
"""
Args:
table_id(int): id of sparse params table
strategy(dict): the config dict.
Returns:
return None
"""
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_SPARSE_TABLE:
return
else:
raise ValueError(
"expect table %s type=%s, but actual type=%s"
% (table_id, pslib.PS_SPARSE_TABLE, table.type)
)
if strategy is None:
strategy = dict()
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.type = pslib.PS_SPARSE_TABLE
support_sparse_key_list = [
'sparse_table_class',
'sparse_compress_in_save',
'sparse_shard_num',
'sparse_accessor_class',
'sparse_learning_rate',
'sparse_initial_g2sum',
'sparse_initial_range',
'sparse_weight_bounds',
'sparse_embedx_dim',
'sparse_embedx_threshold',
'sparse_nonclk_coeff',
'sparse_click_coeff',
'sparse_base_threshold',
'sparse_delta_threshold',
'sparse_delta_keep_days',
'sparse_delete_after_unseen_days',
'sparse_show_click_decay_rate',
'sparse_delete_threshold',
'sparse_converter',
'sparse_deconverter',
'sparse_enable_cache',
'sparse_cache_rate',
'sparse_cache_file_num',
'sparse_beta1_decay_rate',
'sparse_beta2_decay_rate',
'sparse_ada_epsilon',
'sparse_optimizer',
'sparse_ssd_unseenday_threshold',
'embed_sparse_optimizer',
'embed_sparse_learning_rate',
'embed_sparse_weight_bounds',
'embed_sparse_initial_range',
'embed_sparse_initial_g2sum',
'embed_sparse_beta1_decay_rate',
'embed_sparse_beta2_decay_rate',
'embedx_sparse_optimizer',
'embedx_sparse_learning_rate',
'embedx_sparse_weight_bounds',
'embedx_sparse_initial_range',
'embedx_sparse_initial_g2sum',
'embedx_sparse_beta1_decay_rate',
'embedx_sparse_beta2_decay_rate',
]
for key in strategy:
if key not in support_sparse_key_list:
raise ValueError("strategy key '%s' not support" % (key))
support_table_calss = ['DownpourSparseTable', 'DownpourSparseSSDTable']
if strategy.get('sparse_table_class') is not None:
table_class = strategy.get('sparse_table_class')
if table_class not in support_table_calss:
raise ValueError(
"support sparse_table_class: [ 'DownpourSparseTable', 'DownpourSparseSSDTable'], \
but actual %s"
% (table_class)
)
else:
table_class = 'DownpourSparseTable'
table.table_class = table_class
if (
table_class == 'DownpourSparseTable'
or table_class == 'DownpourSparseSSDTable'
):
table.enable_sparse_table_cache = strategy.get(
'sparse_enable_cache', True
)
table.sparse_table_cache_rate = strategy.get(
'sparse_cache_rate', 0.00055
)
table.sparse_table_cache_file_num = strategy.get(
'sparse_cache_file_num', 16
)
table.compress_in_save = strategy.get(
'sparse_compress_in_save', True
)
table.shard_num = strategy.get('sparse_shard_num', 1000)
# DownpourFeatureValueAccessor: for ctr task, has cvm, embedding and sgd info
# DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info
# DownpourSparseValueAccessor : for general task, has embedding and sgd info
# DownpourCtrDoubleAccessor : for ctr task, which show clk are in double
# DownpourUnitAccessor : for ctr task, has cvm, slot, embedding and sgd info
support_accessor_class = [
'DownpourFeatureValueAccessor',
'DownpourCtrAccessor',
'DownpourCtrDymfAccessor',
'DownpourSparseValueAccessor',
'DownpourCtrDoubleAccessor',
'DownpourUnitAccessor',
'DownpourDoubleUnitAccessor',
]
if strategy.get('sparse_accessor_class') is not None:
accessor_class = strategy.get('sparse_accessor_class')
if accessor_class not in support_accessor_class:
raise ValueError(
"support sparse_accessor_class: ['DownpourFeatureValueAccessor', 'DownpourCtrAccessor', 'DownpourCtrDymfAccessor', \
'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor'], \
but actual %s"
% (accessor_class)
)
else:
accessor_class = 'DownpourCtrAccessor'
table.accessor.accessor_class = accessor_class
if (
accessor_class == 'DownpourFeatureValueAccessor'
or accessor_class == 'DownpourCtrAccessor'
or accessor_class == 'DownpourCtrDymfAccessor'
or accessor_class == 'DownpourCtrDoubleAccessor'
):
table.accessor.sparse_sgd_param.learning_rate = strategy.get(
'sparse_learning_rate', 0.05
)
table.accessor.sparse_sgd_param.initial_g2sum = strategy.get(
'sparse_initial_g2sum', 3
)
table.accessor.sparse_sgd_param.initial_range = strategy.get(
'sparse_initial_range', 1e-4
)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_sgd_param.weight_bounds.extend(
[-10, 10]
)
else:
table.accessor.sparse_sgd_param.weight_bounds.extend(
strategy.get('sparse_weight_bounds')
)
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.embedx_threshold = strategy.get(
'sparse_embedx_threshold', 10
)
table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3
table.accessor.downpour_accessor_param.nonclk_coeff = (
strategy.get('sparse_nonclk_coeff', 0.1)
)
table.accessor.downpour_accessor_param.click_coeff = (
strategy.get('sparse_click_coeff', 1)
)
table.accessor.downpour_accessor_param.base_threshold = (
strategy.get('sparse_base_threshold', 1.5)
)
table.accessor.downpour_accessor_param.delta_threshold = (
strategy.get('sparse_delta_threshold', 0.25)
)
table.accessor.downpour_accessor_param.delta_keep_days = (
strategy.get('sparse_delta_keep_days', 16)
)
table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get(
'sparse_delete_after_unseen_days', 30
)
table.accessor.downpour_accessor_param.ssd_unseenday_threshold = strategy.get(
'sparse_ssd_unseenday_threshold', 1
)
table.accessor.downpour_accessor_param.show_click_decay_rate = (
strategy.get('sparse_show_click_decay_rate', 0.98)
)
table.accessor.downpour_accessor_param.delete_threshold = (
strategy.get('sparse_delete_threshold', 0.8)
)
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)",
)
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)",
)
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
elif accessor_class == 'DownpourSparseValueAccessor':
optimizer_name = strategy.get("sparse_optimizer", "adam")
table.accessor.sparse_commonsgd_param.name = optimizer_name
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.fea_dim = int(table.accessor.embedx_dim)
if optimizer_name == "naive":
table.accessor.sparse_commonsgd_param.naive.learning_rate = strategy.get(
'sparse_learning_rate', 0.05
)
table.accessor.sparse_commonsgd_param.naive.initial_range = strategy.get(
'sparse_initial_range', 1e-4
)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend(
[-10, 10]
)
else:
table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend(
strategy.get('sparse_weight_bounds')
)
elif optimizer_name == "adagrad":
table.accessor.sparse_commonsgd_param.adagrad.learning_rate = strategy.get(
'sparse_learning_rate', 0.05
)
table.accessor.sparse_commonsgd_param.adagrad.initial_range = strategy.get(
'sparse_initial_range', 1e-4
)
table.accessor.sparse_commonsgd_param.adagrad.initial_g2sum = strategy.get(
'sparse_initial_g2sum', 3
)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend(
[-10, 10]
)
else:
table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend(
strategy.get('sparse_weight_bounds')
)
elif optimizer_name == "adam":
table.accessor.sparse_commonsgd_param.adam.learning_rate = (
strategy.get('sparse_learning_rate', 0.001)
)
table.accessor.sparse_commonsgd_param.adam.initial_range = (
strategy.get('sparse_initial_range', 1e-4)
)
table.accessor.sparse_commonsgd_param.adam.beta1_decay_rate = strategy.get(
'sparse_beta1_decay_rate', 0.9
)
table.accessor.sparse_commonsgd_param.adam.beta2_decay_rate = strategy.get(
'sparse_beta2_decay_rate', 0.999
)
table.accessor.sparse_commonsgd_param.adam.ada_epsilon = (
strategy.get('sparse_ada_epsilon', 1e-8)
)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend(
[-10, 10]
)
else:
table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend(
strategy.get('sparse_weight_bounds')
)
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)",
)
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)",
)
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
elif (
accessor_class == 'DownpourUnitAccessor'
or accessor_class == 'DownpourDoubleUnitAccessor'
):
self.add_sparse_table_common_config(table, strategy)
self.add_sparse_optimizer(
table.accessor.embed_sgd_param, strategy, "embed_"
)
self.add_sparse_optimizer(
table.accessor.embedx_sgd_param, strategy, "embedx_"
)
def add_dense_table(
self, table_id, param_var, grad_var, strategy, sparse_table_names
):
"""
Args:
table_id(int): id of sparse params table
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the dense config dict
sparse_table_names(list): sparse table names
Returns:
return None
"""
fea_dim = 0
dense_param_vars = []
for p in param_var:
if p.name not in sparse_table_names:
dense_param_vars.append(p)
for param in dense_param_vars:
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_DENSE_TABLE:
table.accessor.fea_dim = fea_dim
return
else:
raise ValueError(
"expect table %s type=%s, but actual type=%s"
% (table_id, pslib.PS_DENSE_TABLE, table.type)
)
if strategy is None:
strategy = dict()
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
support_dense_key_list = [
'dense_table_class',
'dense_compress_in_save',
'dense_accessor_class',
'dense_optimizer',
'dense_learning_rate',
'dense_avg_decay',
'dense_ada_decay',
'dense_ada_epsilon',
'dense_mom_decay',
'dense_naive_lr',
]
for key in strategy:
if key not in support_dense_key_list:
raise ValueError("strategy key '%s' not support" % (key))
table.table_class = strategy.get(
'dense_table_class', "DownpourDenseTable"
)
table.type = pslib.PS_DENSE_TABLE
table.compress_in_save = strategy.get('dense_compress_in_save', True)
table.accessor.accessor_class = strategy.get(
'dense_accessor_class', "DownpourDenseValueAccessor"
)
table.accessor.dense_sgd_param.name = strategy.get(
'dense_optimizer', "adam"
)
table.accessor.dense_sgd_param.adam.learning_rate = strategy.get(
'dense_learning_rate', 5e-06
)
table.accessor.dense_sgd_param.adam.avg_decay_rate = strategy.get(
'dense_avg_decay', 0.999993
)
table.accessor.dense_sgd_param.adam.ada_decay_rate = strategy.get(
'dense_ada_decay', 0.9999
)
table.accessor.dense_sgd_param.adam.ada_epsilon = strategy.get(
'dense_ada_epsilon', 1e-8
)
table.accessor.dense_sgd_param.adam.mom_decay_rate = strategy.get(
'dense_mom_decay', 0.99
)
table.accessor.dense_sgd_param.naive.learning_rate = strategy.get(
'dense_naive_lr', 0.0002
)
table.accessor.fea_dim = fea_dim
def add_data_norm_table(
self,
table_id,
learning_rate,
param_var,
grad_var,
strategy,
sparse_table_names,
):
"""
Args:
table_id(int): id of datanorm table
learning_rate(float): the learning rate used to update parameters
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the datanorm config dict
sparse_table_names(list): sparse table names
Returns:
return None
"""
fea_dim = 0
dense_param_vars = []
for p in param_var:
if p.name not in sparse_table_names:
dense_param_vars.append(p)
for param in dense_param_vars:
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_DENSE_TABLE:
table.accessor.fea_dim = fea_dim
return
else:
raise ValueError(
"expect table %s type=%s, but actual type=%s"
% (table_id, pslib.PS_DENSE_TABLE, table.type)
)
if strategy is None:
strategy = dict()
support_datanorm_key_list = [
'datanorm_table_class',
'datanorm_compress_in_save',
'datanorm_accessor_class',
'datanorm_operation',
'datanorm_decay_rate',
]
for key in strategy:
if key not in support_datanorm_key_list:
raise ValueError("strategy key '%s' not support" % (key))
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = strategy.get(
'datanorm_table_class', 'DownpourDenseTable'
)
table.type = pslib.PS_DENSE_TABLE
table.compress_in_save = strategy.get('datanorm_compress_in_save', True)
table.accessor.accessor_class = strategy.get(
'datanorm_accessor_class', 'DownpourDenseValueAccessor'
)
table.accessor.dense_sgd_param.name = strategy.get(
'datanorm_operation', 'summary'
)
table.accessor.dense_sgd_param.summary.summary_decay_rate = (
strategy.get('datanorm_decay_rate', 0.999999)
)
table.accessor.fea_dim = fea_dim
def add_sparse_optimizer(self, sgd, strategy, prefix):
optimizer_name = strategy.get(prefix + "sparse_optimizer", "adagrad")
sgd.name = optimizer_name
if optimizer_name == "naive":
sgd.naive.learning_rate = strategy.get(
prefix + 'sparse_learning_rate', 0.05
)
sgd.naive.initial_range = strategy.get(
prefix + 'sparse_initial_range', 1e-4
)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.naive.weight_bounds.extend(bounds)
elif optimizer_name == "adagrad":
sgd.adagrad.learning_rate = strategy.get(
prefix + 'sparse_learning_rate', 0.05
)
sgd.adagrad.initial_range = strategy.get(
prefix + 'sparse_initial_range', 1e-4
)
if prefix == "embed_":
sgd.adagrad.initial_range = 0
sgd.adagrad.initial_g2sum = strategy.get(
prefix + 'sparse_initial_g2sum', 3
)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adagrad.weight_bounds.extend(bounds)
elif optimizer_name == "std_adagrad":
sgd.adagrad.learning_rate = strategy.get(
prefix + 'sparse_learning_rate', 0.05
)
sgd.adagrad.initial_range = strategy.get(
prefix + 'sparse_initial_range', 1e-4
)
if prefix == "embed_":
sgd.adagrad.initial_range = 0
sgd.adagrad.initial_g2sum = strategy.get(
prefix + 'sparse_initial_g2sum', 3
)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adagrad.weight_bounds.extend(bounds)
elif optimizer_name == "adam":
sgd.adam.learning_rate = strategy.get(
prefix + 'sparse_learning_rate', 0.001
)
sgd.adam.initial_range = strategy.get(
prefix + 'sparse_initial_range', 1e-4
)
sgd.adam.beta1_decay_rate = strategy.get(
prefix + 'sparse_beta1_decay_rate', 0.9
)
sgd.adam.beta2_decay_rate = strategy.get(
prefix + 'sparse_beta2_decay_rate', 0.999
)
sgd.adam.ada_epsilon = strategy.get(
prefix + 'sparse_ada_epsilon', 1e-8
)
bounds = strategy.get(prefix + 'sparse_weight_bounds', [-10, 10])
sgd.adam.weight_bounds.extend(bounds)
def add_sparse_table_common_config(self, table, strategy):
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.embedx_threshold = strategy.get(
'sparse_embedx_threshold', 10
)
table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3
table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get(
'sparse_nonclk_coeff', 0.1
)
table.accessor.downpour_accessor_param.click_coeff = strategy.get(
'sparse_click_coeff', 1
)
table.accessor.downpour_accessor_param.base_threshold = strategy.get(
'sparse_base_threshold', 1.5
)
table.accessor.downpour_accessor_param.delta_threshold = strategy.get(
'sparse_delta_threshold', 0.25
)
table.accessor.downpour_accessor_param.delta_keep_days = strategy.get(
'sparse_delta_keep_days', 16
)
table.accessor.downpour_accessor_param.delete_after_unseen_days = (
strategy.get('sparse_delete_after_unseen_days', 30)
)
table.accessor.downpour_accessor_param.show_click_decay_rate = (
strategy.get('sparse_show_click_decay_rate', 0.98)
)
table.accessor.downpour_accessor_param.delete_threshold = strategy.get(
'sparse_delete_threshold', 0.8
)
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)",
)
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)",
)
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
def get_desc(self):
"""
Return downpour server program_desc
"""
return self._server
class DownpourWorker(Worker):
"""
DownpourWorker class is used to generate worker program_desc
Args:
window (int): push params frequency
worker: it is pslib.DownpourTrainerParameter
Examples:
worker = DownpourWorker(1)
"""
def __init__(self, window):
self.window = window
self._worker = pslib.DownpourTrainerParameter()
def add_sparse_table(
self, table_id, slot_key_vars, slot_value_vars, slot_value_grads=None
):
"""
Args:
table_id(int): id of sparse params table
slot_key_vars(list): slot key id
slot_value_vars(list): slot key value after embedding
slot_value_grads(list): grad of all params, default is None
Returns:
return None
"""
if slot_value_grads is None:
slot_value_grad_names = [
var.name + "@GRAD" for var in slot_value_vars
]
else:
value_to_key = {}
for i in range(len(slot_key_vars)):
value_to_key[slot_value_vars[i].name] = slot_key_vars[i]
slot_value_grad_names = []
all_grad_names = [var.name for var in slot_value_grads]
for var in slot_value_vars:
if var.name + "@GRAD" in all_grad_names:
slot_value_grad_names.append(var.name + "@GRAD")
sorted_slot_value_vars = [
i
for i in slot_value_vars
if i.name + "@GRAD" in slot_value_grad_names
]
sorted_slot_value_vars += [
i
for i in slot_value_vars
if i.name + "@GRAD" not in slot_value_grad_names
]
sorted_slot_key_vars = [
value_to_key[v.name] for v in sorted_slot_value_vars
]
target_table = None
for table in self._worker.sparse_table:
if table.table_id == table_id:
keys = table.slot_key
key_names = [var.name for var in sorted_slot_key_vars]
for key_name in key_names:
if key_name not in keys:
raise ValueError(
"sparse table %s slot_key error" % table_id
)
target_table = table
break
table = target_table
if table is not None:
self._worker.sparse_table.remove(table)
table = self._worker.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in sorted_slot_key_vars])
table.slot_value.extend([var.name for var in sorted_slot_value_vars])
table.slot_gradient.extend(slot_value_grad_names)
def add_dense_table(
self,
table_id,
learning_rate,
param_vars,
grad_vars,
dense_start_table_id,
sparse_table_names,
):
r"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_vars(list): all dense param. it is a list.
grad_vars(list): all dense grad parm it is a list.
dense_start_table_id(int): dense table start index
sparse_table_names(list): sparse table names
Returns:
return None
"""
sparse_table_name_grad = []
for name in sparse_table_names:
sparse_table_name_grad.append(name + "@GRAD")
dense_param_name = []
for p in param_vars:
if p.name not in sparse_table_names:
dense_param_name.append(p.name)
dense_grad_name = []
for g in grad_vars:
if g.name not in sparse_table_name_grad:
dense_grad_name.append(g.name)
dense_param_name.sort()
dense_grad_name.sort()
for table in self._worker.dense_table:
if table.table_id == table_id:
desc_dense_param_name = list(table.dense_variable_name)
desc_dense_param_name.sort()
if dense_param_name == desc_dense_param_name:
desc_dense_grad_name = list(
table.dense_gradient_variable_name
)
desc_dense_grad_name.sort()
if dense_grad_name == desc_dense_grad_name:
return
else:
raise ValueError(
"dense table %s dense_gradient_variable_name "
"error" % table_id
)
else:
raise ValueError(
"dense table %s dense_variable_name error" % table_id
)
table = self._worker.dense_table.add()
table.table_id = table_id
# def cmp_fc(x, y):
# if x.startswith("fc_") and y.startswith("fc_"):
# index_x = x.find('.')
# index_y = y.find('.')
# if index_x > 0 and index_y > 0:
# num_x = x[3:index_x]
# num_y = y[3:index_y]
# if num_x.isdigit() and num_y.isdigit():
# if int(num_x) < int(num_y):
# return -1
# if int(num_x) > int(num_y):
# return 1
# if x[index_x + 1] == 'w' and y[index_y + 1] == 'b':
# return -1
# if x[index_x + 1] == 'b' and y[index_y + 1] == 'w':
# return 1
# if x < y:
# return -1
# else:
# return 1
# table.dense_variable_name.extend(sorted(dense_param_name, cmp_fc))
# table.dense_gradient_variable_name.extend(
# sorted(dense_grad_name, cmp_fc))
table.dense_variable_name.extend(dense_param_name)
table.dense_gradient_variable_name.extend(dense_grad_name)
def get_desc(self):
"""
Return downpour worker program_desc
"""
return self._worker
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Optimizer Factory."""
__all__ = ["DistributedAdam", "FLEET_GLOBAL_DICT"]
import paddle
from paddle.framework import core
from paddle.distributed.distribute_lookup_table import (
find_distributed_lookup_table,
)
from paddle.distributed.distribute_lookup_table import (
find_distributed_lookup_table_inputs,
)
from paddle.distributed.distribute_lookup_table import (
find_distributed_lookup_table_outputs,
)
from google.protobuf import text_format
from collections import OrderedDict
import copy
from .node import DownpourWorker, DownpourServer
from . import ps_pb2 as pslib
import os
import logging
OpRole = core.op_proto_and_checker_maker.OpRole
# this dict is for store info about pull/push sparse ops.
FLEET_GLOBAL_DICT = {
# global settings
"enable": False,
"emb_to_table": {},
"emb_to_accessor": {},
"emb_to_size": {},
# current embedding settings
"cur_sparse_id": 0,
"cur_accessor": "",
"click_name": "",
"scale_sparse_grad": None,
}
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
class DistributedOptimizerImplBase:
"""
DistributedOptimizerImplBase
base class of optimizers
"""
def __init__(self, optimizer):
self._optimizer = optimizer
self._learning_rate = optimizer._learning_rate
self._regularization = optimizer.regularization
def minimize(
self,
losses,
startup_program=None,
parameter_list=None,
no_grad_set=None,
):
"""
Args:
losses(Variable): loss variable defined by user
startup_program(Program): startup program that defined by user
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
"""
pass
class DistributedAdam(DistributedOptimizerImplBase):
"""
DistributedAdam
adam optimizer in distributed training
"""
def __init__(self, optimizer):
# todo(guru4elephant): add more optimizers here as argument
# todo(guru4elephant): make learning_rate as a variable
super().__init__(optimizer)
self._window = 1
self.type = "downpour"
self.data_norm_name = [
".batch_size",
".batch_square_sum",
".batch_sum",
".batch_size@GRAD",
".batch_square_sum@GRAD",
".batch_sum@GRAD",
]
self.supported_embedding_types = [
"lookup_table",
"pull_sparse",
"pull_sparse_v2",
"pull_box_sparse",
"pull_gpups_sparse",
]
self.supported_embedding_grad_types = [
"lookup_table_grad",
"push_sparse",
"push_sparse_v2",
]
op_maker = core.op_proto_and_checker_maker
self.op_role_key = op_maker.kOpRoleAttrName()
def _find_distributed_lookup_table_inputs(self, program, table_names):
"""
Find input variable of distribute lookup table in program.
We could support multi-distribute table now.
Args:
program(Program): given program, locate distributed lookup table
table_name(str): given table names that is found beforehand
Returns:
inputs
"""
local_vars = program.current_block().vars
inputs_dict = dict()
for table_name in table_names:
inputs_dict[table_name] = []
for op in program.global_block().ops:
if op.type in self.supported_embedding_types:
if op.input("W")[0] in table_names:
inputs_dict[op.input("W")[0]].extend(
[local_vars[name] for name in op.input("Ids")]
)
return inputs_dict
def _find_distributed_lookup_table_outputs(self, program, table_names):
"""
Find output variable of distribute lookup table in program.
We could support multi-distribute table now.
Args:
programs(Program): given program, locate distributed lookup table
table_name(str): given table name that is found beforehand
Returns:
outputs
"""
local_vars = program.current_block().vars
outputs_dict = dict()
for table_name in table_names:
outputs_dict[table_name] = []
for op in program.global_block().ops:
if op.type in self.supported_embedding_types:
if op.input("W")[0] in table_names:
outputs_dict[op.input("W")[0]].extend(
[local_vars[name] for name in op.output("Out")]
)
return outputs_dict
def _find_distributed_lookup_table_grads(self, program, table_names):
local_vars = program.current_block().vars
grads_dict = dict()
for table_name in table_names:
grads_dict[table_name] = []
for op in program.global_block().ops:
if op.type in self.supported_embedding_grad_types:
if op.input("W")[0] in table_names:
grads_dict[op.input("W")[0]].extend(
[local_vars[name] for name in op.input("Out@GRAD")]
)
return grads_dict
def _is_optimizer_op(self, op):
return self.op_role_key in op.attr_names and int(
op.all_attrs()[self.op_role_key]
) & int(OpRole.Optimize)
def _remove_optimize_op_for_embedding(self, loss, table_name):
"""
find multi-sparse-table
"""
table_name = [name + "@GRAD" for name in table_name]
need_remove_op_index = []
block = loss.block.program.global_block()
for ids, op in list(enumerate(block.ops)):
if self._is_optimizer_op(op):
if op.input("Grad")[0] in table_name:
need_remove_op_index.append(ids)
need_remove_op_index.sort(reverse=True)
for index in need_remove_op_index:
block._remove_op(index)
def _find_multi_distributed_lookup_table(self, losses):
"""
find multi-sparse-table
"""
table_names = set()
cnt = 0
tmp_list = []
ret_list = []
for loss in losses:
for op in loss.block.program.global_block().ops:
if op.type in self.supported_embedding_types:
if op.attr('is_distributed') is True:
table_name = op.input("W")[0]
if table_name not in table_names:
table_names.add(table_name)
tmp_list.append([table_name, cnt])
cnt += 1
tmp_list.sort(key=lambda k: k[1])
for x in tmp_list:
ret_list.append(x[0])
return ret_list
def _if_last_block(self, op, _equal_dict):
# for conditional_block op
cond_str = op.input('Cond')[0]
bool_test = False
if cond_str.startswith('equal'):
bool_test = True
vars_ = op.input('Input')
equal_keys = _equal_dict.keys()
for var_cond in vars_:
if var_cond in equal_keys:
if bool_test:
print("the conditional block is error")
return False
return True
def _generte_cond_para_map(
self, op, _fill_value_dict, _equal_fill_dict, _now_program, _all_params
):
# generate cond value to parameter map recursively
cond_str = op.input('Cond')[0]
vars_ = op.input('Input')
if self._if_last_block(op, _equal_fill_dict):
vars_ = op.input('Input')
cond_key = ""
if cond_str.startswith('equal'):
cond_key = int(_fill_value_dict[_equal_fill_dict[cond_str]])
else:
cond_key = -1
p_list = []
for var_cond in vars_:
if var_cond in _all_params:
p_list.append(var_cond)
self._cond_params[cond_key] = p_list
self._other_params.extend(p_list)
else:
ops_cond = _now_program.block(int(op.attr('sub_block').id)).ops
for op in ops_cond:
if op.type == 'conditional_block':
self._generte_cond_para_map(
op,
_fill_value_dict,
_equal_fill_dict,
_now_program,
_all_params,
)
def _has_conditional_block(self, loss):
now_program = loss.block.program
root_block = now_program.block(0)
ops_ = root_block.ops
for op in ops_:
if op.type == 'conditional_block':
return True
return False
def _check_params_grads(self, params, grads):
if len(params) != len(grads):
raise ValueError(
"params size != grads size, %s vs %s"
% (len(params), len(grads))
)
pname2grad = dict()
for i in range(len(params)):
pname = params[i].name
gname = grads[i].name
if pname != gname[:-5]:
raise ValueError(" params != grads , %s vs %s" % (pname, gname))
pname2grad[pname] = grads[i]
return pname2grad
def _generate_multi_dense_table(
self,
params,
grads,
cond_params,
other_params,
sparse_table_names,
dense_table_id=0,
):
# generate multi dense table by cond value
pname2grad = self._check_params_grads(params, grads)
root_params_list = []
root_grads_list = []
dense_tables = []
for i, p in enumerate(params):
if p.name not in other_params and p.name not in sparse_table_names:
root_params_list.append(p)
root_grads_list.append(grads[i])
if len(root_params_list) > 0:
dense_tables.append(dense_table_id)
dense_table_id += 1
lists_params = [[] for i in range(len(cond_params.keys()))]
lists_grads = [[] for i in range(len(cond_params.keys()))]
key_id = 0
name2key = dict()
cond2denseid = dict()
for key, value in cond_params.items():
cond2denseid[key] = dense_table_id
dense_tables.append(dense_table_id)
dense_table_id += 1
for v in value:
name2key[v] = key_id
key_id += 1
for p in params:
if p.name in other_params:
lists_params[name2key[p.name]].append(p)
lists_grads[name2key[p.name]].append(pname2grad[p.name])
return (
dense_tables,
cond2denseid,
lists_params,
lists_grads,
root_params_list,
root_grads_list,
)
def _gen_distributed_emb_to_size_dict(self, program):
d_size = dict()
local_vars = program.current_block().vars
for op in program.global_block().ops:
if op.type in self.supported_embedding_types:
if op.attr('is_distributed') is True:
table_name = op.input("W")[0]
emb_size = local_vars[table_name].shape[-1]
if d_size.get(table_name) is None:
d_size[table_name] = emb_size
elif d_size[table_name] != emb_size:
raise ValueError(
"embedding size error: %s vs %s"
% (emb_size, d_size[table_name])
)
return d_size
def _check_config_fleet_with_program_op(
self, strategy, table_name, emb_to_size
):
if strategy.get(table_name) is None:
strategy[table_name] = dict()
st = strategy[table_name]
accessor = "DownpourCtrAccessor"
if st.get("sparse_accessor_class") is not None:
accessor = st["sparse_accessor_class"]
# set sparse_embedx_dim in the strategy according to accessor and use_cvm config
if (
accessor == "DownpourFeatureValueAccessor"
or accessor == "DownpourCtrAccessor"
or accessor == "DownpourCtrDymfAccessor"
or accessor == "DownpourDoubleUnitAccessor"
or accessor == "DownpourUnitAccessor"
):
if (
st.get("sparse_embedx_dim") is not None
and strategy.get("use_cvm") == True
and st["sparse_embedx_dim"] != emb_to_size[table_name] - 3
):
raise ValueError(
"fleet config sparse_embedx_dim=%s not"
" equal to embedding dim - 3 = %s"
% (st["sparse_embedx_dim"], emb_to_size[table_name] - 3)
)
if (
st.get("sparse_embedx_dim") is not None
and strategy.get("use_cvm") == False
and st["sparse_embedx_dim"] != emb_to_size[table_name] - 1
):
raise ValueError(
"fleet config sparse_embedx_dim=%s not"
" equal to embedding dim - 1 = %s"
% (st["sparse_embedx_dim"], emb_to_size[table_name] - 1)
)
if (
st.get("sparse_embedx_dim") is None
and strategy.get("use_cvm") == True
):
logger.warning(
"sparse embedding dim for table name '{}' is: {}, while sparse_embedx_dim "
"with same sparse table name is not set in config_fleet.py. "
"Hence automatically set sparse_embedx_dim = {} - 3.".format(
table_name,
emb_to_size[table_name],
emb_to_size[table_name],
)
)
st["sparse_embedx_dim"] = emb_to_size[table_name] - 3
if (
st.get("sparse_embedx_dim") is None
and strategy.get("use_cvm") == False
):
logger.warning(
"sparse embedding dim for table name '{}' is: {}, while sparse_embedx_dim "
"with same sparse table name is not set in config_fleet.py. "
"Hence automatically set sparse_embedx_dim = {} - 1.".format(
table_name,
emb_to_size[table_name],
emb_to_size[table_name],
)
)
st["sparse_embedx_dim"] = emb_to_size[table_name] - 1
elif accessor == "DownpourSparseValueAccessor":
if (
st.get("sparse_embedx_dim") is not None
and st["sparse_embedx_dim"] != emb_to_size[table_name]
):
raise ValueError(
"fleet config sparse_embedx_dim=%s not"
" equal to embedding dim = %s"
% (st["sparse_embedx_dim"], emb_to_size[table_name])
)
if st.get("sparse_embedx_dim") is None:
logger.warning(
"sparse embedding dim for table name '{}' is: {}, while sparse_embedx_dim "
"with same sparse table name is not set in config_fleet.py. "
"Hence automatically set sparse_embedx_dim = {}.".format(
table_name,
emb_to_size[table_name],
emb_to_size[table_name],
)
)
st["sparse_embedx_dim"] = emb_to_size[table_name]
return strategy
def _minimize(
self,
losses,
startup_program=None,
parameter_list=None,
no_grad_set=None,
strategy={},
):
"""
DownpounSGD is a distributed optimizer so
that user can call minimize to generate backward
operators and optimization operators within minimize function
Args:
loss(Variable): loss variable defined by user
startup_program(Program): startup program that defined by user
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
strategy(dict): user-defined properties
Returns:
[optimize_ops, grads_and_weights]
"""
# sparse table names of each program
prog_id_to_sparse_table = OrderedDict()
# inputs_dict and outputs_dict of sparse tables of each program
prog_id_to_inputs_dict = OrderedDict()
prog_id_to_outputs_dict = OrderedDict()
# related to PSParameter
ps_param = pslib.PSParameter()
# related to ServerParameter
server = DownpourServer()
# program to worker (related to DownpourTrainerParameter)
prog_id_to_worker = OrderedDict()
# param_grads of each program
prog_id_to_param_grads = OrderedDict()
# sparse_grads of each program
prog_id_to_sparse_grads = OrderedDict()
# unique program set
program_id_set = set()
sparse_table_to_index = OrderedDict()
sparse_table_index = 0
for num in range(len(losses)):
loss = losses[num]
parameters = None
if parameter_list is not None:
parameters = parameter_list[num]
prog_id = str(id(loss.block.program))
# param_grads of program
params_grads = sorted(
paddle.static.append_backward(loss, parameters, no_grad_set),
key=lambda x: x[0].name,
)
flag_use_ps_gpu = strategy.get("use_ps_gpu", False)
if flag_use_ps_gpu:
if not isinstance(startup_program, list):
startup_program = [startup_program]
optimizer = copy.deepcopy(self._optimizer)
optimize_ops = optimizer.apply_optimize(
loss,
startup_program=startup_program[num],
params_grads=params_grads,
)
embedding_table = self._find_multi_distributed_lookup_table(
[loss]
)
self._remove_optimize_op_for_embedding(loss, embedding_table)
# has condition_block op means multi-task
flag_multi_task = self._has_conditional_block(loss)
if flag_multi_task:
self._cond_params = dict()
self._other_params = []
now_program = loss.block.program
root_block = now_program.block(0)
all_params = []
for par in root_block.all_parameters():
all_params.append(par.name)
ops_ = root_block.ops
fill_value_dict = dict()
equal_fill_dict = dict()
for op in ops_:
# conditional_block op must has fill_constant and equal op
if op.type == 'fill_constant':
fill_value_dict[op.output('Out')[0]] = op.attr('value')
if op.type == 'equal':
equal_fill_dict[op.output('Out')[0]] = op.input('Y')[0]
if op.type == 'conditional_block':
self._generte_cond_para_map(
op,
fill_value_dict,
equal_fill_dict,
now_program,
all_params,
)
if prog_id not in program_id_set:
program_id_set.add(prog_id)
sparse_table = self._find_multi_distributed_lookup_table([loss])
prog_id_to_sparse_table[prog_id] = sparse_table
# get sparse_table_to_index
for tn in sparse_table:
if sparse_table_to_index.get(tn) is None:
sparse_table_to_index[tn] = sparse_table_index
sparse_table_index += 1
# get {table_name: emb_size} dict from program ops
emb_to_size = self._gen_distributed_emb_to_size_dict(
loss.block.program
)
# get inputs_dict
inputs_dict = self._find_distributed_lookup_table_inputs(
loss.block.program, sparse_table
)
prog_id_to_inputs_dict[prog_id] = inputs_dict
# get outputs_dict
outputs_dict = self._find_distributed_lookup_table_outputs(
loss.block.program, sparse_table
)
prog_id_to_outputs_dict[prog_id] = outputs_dict
prog_id_to_worker[prog_id] = DownpourWorker(self._window)
grads_dict = self._find_distributed_lookup_table_grads(
loss.block.program, sparse_table
)
prog_id_to_sparse_grads[prog_id] = grads_dict
if prog_id not in prog_id_to_param_grads:
prog_id_to_param_grads[prog_id] = []
prog_id_to_param_grads[prog_id].append(params_grads)
# if strategy.get("parallel_compute")
# if user specify a fleet_desc.prototxt file, then load the file
# instead of creating default fleet_desc.prototxt.
# user can specify server_param or trainer_param or fs_client_param.
if strategy.get("fleet_desc_file") is not None:
fleet_desc_file = strategy["fleet_desc_file"]
with open(fleet_desc_file) as f:
text_format.Merge(f.read(), ps_param)
server.get_desc().CopyFrom(ps_param.server_param)
if len(ps_param.trainer_param) == 1:
for k in prog_id_to_worker:
prog_id_to_worker[k].get_desc().CopyFrom(
ps_param.trainer_param[0]
)
else:
if len(ps_param.trainer_param) != len(prog_id_to_worker):
raise ValueError(
"trainer param size != program size, %s vs %s"
% (len(ps_param.trainer_param), len(prog_id_to_worker))
)
idx = 0
# prog_id_to_worker is OrderedDict
for k in prog_id_to_worker:
prog_id_to_worker[k].get_desc().CopyFrom(
ps_param.trainer_param[idx]
)
idx += 1
# check config in op defination and fleet config
if FLEET_GLOBAL_DICT["enable"]:
one_slot = None
strategy["device_worker"] = "Hogwild"
emb_to_table = FLEET_GLOBAL_DICT["emb_to_table"]
emb_to_accessor = FLEET_GLOBAL_DICT["emb_to_accessor"]
emb_to_size = FLEET_GLOBAL_DICT["emb_to_size"]
if len(sparse_table_to_index) != len(emb_to_table):
raise ValueError(
"sparse tables from program != sparse tables from op: %s "
"vs %s" % (len(sparse_table_to_index), len(emb_to_table))
)
for key in sparse_table_to_index:
if (
key not in emb_to_table
or sparse_table_to_index[key] != emb_to_table[key]
):
print("sparse_table_to_index ", sparse_table_to_index)
print("emb_to_table ", emb_to_table)
raise ValueError("key error: %s" % key)
if strategy.get(key) is None:
strategy[key] = dict()
st = strategy[key]
accessor = None
if st.get("sparse_accessor_class") is not None:
accessor = st["sparse_accessor_class"]
tables = (
server.get_desc().downpour_server_param.downpour_table_param
)
for table in tables:
if table.table_id == sparse_table_to_index[key]:
accessor = table.accessor.accessor_class
break
for loss in losses:
for op in loss.block.program.global_block().ops:
if op.type in self.supported_embedding_types:
if accessor is not None and op.has_attr(
"AccessorClass"
):
op._set_attr("AccessorClass", accessor)
if one_slot is None:
one_slot = (
loss.block.program.global_block().var(
op.input("Ids")[0]
)
)
# if accessor is None, use default accessor in op definition
if accessor is None:
accessor = emb_to_accessor[key]
# set sparse_embedx_dim in strategy,
# user do not have to set it in config_fleet
if (
accessor == "DownpourFeatureValueAccessor"
or accessor == "DownpourCtrDymfAccessor"
or accessor == "DownpourCtrAccessor"
or accessor == "DownpourDoubleUnitAccessor"
or accessor == "DownpourUnitAccessor"
):
if (
st.get("sparse_embedx_dim") is not None
and st["sparse_embedx_dim"] != emb_to_size[key] - 3
):
raise ValueError(
"fleet config sparse_embedx_dim=%s not"
" equal to embedding size - 3 = %s"
% (st["sparse_embedx_dim"], emb_to_size[key] - 3)
)
st["sparse_embedx_dim"] = emb_to_size[key] - 3
elif accessor == "DownpourSparseValueAccessor":
if (
st.get("sparse_embedx_dim") is not None
and st["sparse_embedx_dim"] != emb_to_size[key]
):
raise ValueError(
"fleet config sparse_embedx_dim=%s not"
" equal to embedding size = %s"
% (st["sparse_embedx_dim"], emb_to_size[key])
)
st["sparse_embedx_dim"] = emb_to_size[key]
# ServerParameter add all sparse tables
for tn in sparse_table_to_index:
sparse_table_index = sparse_table_to_index[tn]
st = self._check_config_fleet_with_program_op(
strategy, tn, emb_to_size
)
if st.get(tn) is not None:
server.add_sparse_table(sparse_table_index, st[tn])
else:
server.add_sparse_table(sparse_table_index, None)
# each DownpourTrainerParameter add its own sparse tables
program_id_set.clear()
for loss in losses:
prog_id = str(id(loss.block.program))
if prog_id not in program_id_set:
program_id_set.add(prog_id)
worker = prog_id_to_worker[prog_id]
inputs_dict = prog_id_to_inputs_dict[prog_id]
outputs_dict = prog_id_to_outputs_dict[prog_id]
for tn in prog_id_to_sparse_table[prog_id]:
sparse_table_index = sparse_table_to_index[tn]
grads_dict = prog_id_to_sparse_grads[prog_id]
worker.add_sparse_table(
sparse_table_index,
inputs_dict[tn],
outputs_dict[tn],
grads_dict[tn],
)
dense_start_table_id = len(sparse_table_to_index)
dense_table_index = len(sparse_table_to_index)
program_configs = {}
# ServerParameter add all dense tables
# each DownpourTrainerParameter add its own dense tables
program_id_set.clear()
for loss_index in range(len(losses)):
program_id = str(id(losses[loss_index].block.program))
if program_id not in program_id_set:
program_id_set.add(program_id)
worker = prog_id_to_worker[program_id]
sparse_table_names = prog_id_to_sparse_table[program_id]
sparse_table_index = [
sparse_table_to_index[i] for i in sparse_table_names
]
program_configs[program_id] = {
"pull_sparse": [t_index for t_index in sparse_table_index],
"push_sparse": [t_index for t_index in sparse_table_index],
}
params_grads = prog_id_to_param_grads[program_id]
for pg in params_grads:
params = []
grads = []
data_norm_params = []
data_norm_grads = []
for i in pg:
is_data_norm_data = False
for data_norm_name in self.data_norm_name:
if i[0].name.endswith(data_norm_name):
is_data_norm_data = True
data_norm_params.append(i[0])
if not is_data_norm_data:
params.append(i[0])
for i in pg:
is_data_norm_data = False
for data_norm_grad in self.data_norm_name:
if i[0].name.endswith(data_norm_grad):
is_data_norm_data = True
data_norm_grads.append(i[1])
if not is_data_norm_data:
grads.append(i[1])
# for new dense table
multi_task_dense_tables_push = []
multi_task_dense_tables_pull = []
if flag_multi_task:
(
dense_tables,
cond2denseid,
lists_params,
lists_grads,
root_params_list,
root_grads_list,
) = self._generate_multi_dense_table(
params,
grads,
self._cond_params,
self._other_params,
sparse_table_names,
dense_table_index,
)
program_configs[program_id][
'cond2denseid'
] = cond2denseid
multi_task_dense_tables_push = dense_tables
multi_task_dense_tables_pull = dense_tables[:]
if strategy.get('dense_table') is not None:
if flag_multi_task:
server_dense_table_index = dense_table_index
if len(root_params_list) > 0:
server.add_dense_table(
server_dense_table_index,
root_params_list,
root_grads_list,
strategy['dense_table'],
sparse_table_names,
)
server_dense_table_index += 1
for i in range(len(lists_params)):
server.add_dense_table(
server_dense_table_index,
lists_params[i],
lists_grads[i],
strategy['dense_table'],
sparse_table_names,
)
server_dense_table_index += 1
else:
server.add_dense_table(
dense_table_index,
params,
grads,
strategy['dense_table'],
sparse_table_names,
)
else:
server.add_dense_table(
dense_table_index,
params,
grads,
None,
sparse_table_names,
)
if flag_multi_task:
if len(root_params_list) > 0:
worker.add_dense_table(
dense_table_index,
self._learning_rate,
root_params_list,
root_grads_list,
dense_start_table_id,
sparse_table_names,
)
dense_table_index += 1
for i in range(len(lists_params)):
worker.add_dense_table(
dense_table_index,
self._learning_rate,
lists_params[i],
lists_grads[i],
dense_start_table_id,
sparse_table_names,
)
dense_table_index += 1
dense_table_index -= 1
else:
worker.add_dense_table(
dense_table_index,
self._learning_rate,
params,
grads,
dense_start_table_id,
sparse_table_names,
)
if FLEET_GLOBAL_DICT["enable"]:
cur_prog = losses[loss_index].block.program
cur_prog.global_block().append_op(
type="push_dense",
inputs={"Ids": one_slot},
attrs={
"InputNames": [i.name for i in grads],
"TableId": dense_table_index,
"ScaleDataNorm": strategy.get(
"scale_datanorm", -1
),
},
)
if (
"pull_dense" in program_configs[program_id]
and "push_dense" in program_configs[program_id]
and len(program_configs[program_id]["pull_dense"]) > 0
):
if flag_multi_task:
program_configs[program_id]["pull_dense"].extend(
multi_task_dense_tables_pull
)
program_configs[program_id]["push_dense"].extend(
multi_task_dense_tables_push
)
else:
program_configs[program_id]["pull_dense"].extend(
[dense_table_index]
)
program_configs[program_id]["push_dense"].extend(
[dense_table_index]
)
else:
if flag_multi_task:
program_configs[program_id][
"pull_dense"
] = multi_task_dense_tables_pull
program_configs[program_id][
"push_dense"
] = multi_task_dense_tables_push
else:
program_configs[program_id]["pull_dense"] = [
dense_table_index
]
program_configs[program_id]["push_dense"] = [
dense_table_index
]
if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
dense_table_index += 1
if strategy.get('datanorm_table') is not None:
server.add_data_norm_table(
dense_table_index,
self._learning_rate,
data_norm_params,
data_norm_grads,
strategy['datanorm_table'],
sparse_table_names,
)
else:
server.add_data_norm_table(
dense_table_index,
self._learning_rate,
data_norm_params,
data_norm_grads,
None,
sparse_table_names,
)
worker.add_dense_table(
dense_table_index,
self._learning_rate,
data_norm_params,
data_norm_grads,
dense_start_table_id,
sparse_table_names,
)
if FLEET_GLOBAL_DICT["enable"]:
cur_prog = losses[loss_index].block.program
cur_prog.global_block().append_op(
type="push_dense",
inputs={"Ids": one_slot},
attrs={
"InputNames": [
i.name for i in data_norm_grads
],
"TableId": dense_table_index,
"ScaleDataNorm": strategy.get(
"scale_datanorm", -1
),
},
)
program_configs[program_id]["pull_dense"].extend(
[dense_table_index]
)
program_configs[program_id]["push_dense"].extend(
[dense_table_index]
)
dense_table_index += 1
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
if len(worker.get_desc().skip_op) == 0:
worker.get_desc().skip_op.extend(worker_skipped_ops)
ps_param.server_param.CopyFrom(server.get_desc())
# prog_id_to_worker is OrderedDict
if len(ps_param.trainer_param) == 0:
for k in prog_id_to_worker:
tp = ps_param.trainer_param.add()
tp.CopyFrom(prog_id_to_worker[k].get_desc())
if strategy.get("fs_uri") is not None:
ps_param.fs_client_param.uri = strategy["fs_uri"]
elif ps_param.fs_client_param.uri == "":
ps_param.fs_client_param.uri = "hdfs://your_hdfs_uri"
if strategy.get("fs_user") is not None:
ps_param.fs_client_param.user = strategy["fs_user"]
elif ps_param.fs_client_param.user == "":
ps_param.fs_client_param.user = "your_hdfs_user"
if strategy.get("fs_passwd") is not None:
ps_param.fs_client_param.passwd = strategy["fs_passwd"]
elif ps_param.fs_client_param.passwd == "":
ps_param.fs_client_param.passwd = "your_hdfs_passwd"
if strategy.get("fs_hadoop_bin") is not None:
ps_param.fs_client_param.hadoop_bin = strategy["fs_hadoop_bin"]
elif ps_param.fs_client_param.hadoop_bin == "":
ps_param.fs_client_param.hadoop_bin = "$HADOOP_HOME/bin/hadoop"
opt_info = {}
opt_info["program_id_to_worker"] = prog_id_to_worker
opt_info["program_configs"] = program_configs
opt_info["trainer"] = strategy.get("trainer", "DistMultiTrainer")
opt_info["device_worker"] = strategy.get("device_worker", "DownpourSGD")
opt_info["optimizer"] = "DownpourSGD"
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
opt_info["use_cvm"] = strategy.get("use_cvm", False)
opt_info["no_cvm"] = strategy.get("no_cvm", False)
opt_info["scale_sparse_gradient_with_batch_size"] = strategy.get(
"scale_sparse_gradient_with_batch_size", True
)
opt_info["worker_class"] = strategy.get(
"worker_class", "DownpourWorker"
)
opt_info["stat_var_names"] = strategy.get("stat_var_names", [])
opt_info["local_tables"] = strategy.get("local_tables", [])
opt_info["async_tables"] = strategy.get("async_tables", [])
opt_info["async_tables"] = strategy.get("async_tables", [])
opt_info["scale_datanorm"] = strategy.get("scale_datanorm", -1)
opt_info["check_nan_var_names"] = strategy.get(
"check_nan_var_names", []
)
opt_info["dump_slot"] = False
opt_info["dump_converter"] = ""
opt_info["dump_fields"] = strategy.get("dump_fields", [])
opt_info["dump_file_num"] = strategy.get("dump_file_num", 16)
opt_info["user_define_dump_filename"] = strategy.get(
"user_define_dump_filename", ""
)
opt_info["dump_fields_path"] = strategy.get("dump_fields_path", "")
opt_info["dump_param"] = strategy.get("dump_param", [])
gpus_env = os.getenv("FLAGS_selected_gpus", "0")
opt_info["worker_places"] = [int(s) for s in gpus_env.split(",")]
opt_info["use_ps_gpu"] = strategy.get("use_ps_gpu", False)
if server._server.downpour_server_param.downpour_table_param[
0
].accessor.accessor_class in [
"DownpourCtrAccessor",
"DownpourCtrDoubleAccessor",
"DownpourUnitAccessor",
"DownpourDoubleUnitAccessor",
"DownpourCtrDymfAccessor",
]:
opt_info["dump_slot"] = True
elif (
server._server.downpour_server_param.downpour_table_param[
0
].accessor.accessor_class
== "DownpourSparseValueAccessor"
):
opt_info["no_cvm"] = True
opt_info["adjust_ins_weight"] = strategy.get("adjust_ins_weight", {})
opt_info["copy_table"] = strategy.get("copy_table", {})
opt_info["loss_names"] = strategy.get("loss_names", [])
for loss in losses:
loss.block.program._fleet_opt = opt_info
param_grads_list = []
for loss in losses:
prog_id = str(id(loss.block.program))
param_grads_list.append(prog_id_to_param_grads[prog_id])
return None, param_grads_list, opt_info
...@@ -25,8 +25,8 @@ from dist_test_utils import remove_ps_flag ...@@ -25,8 +25,8 @@ from dist_test_utils import remove_ps_flag
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.fluid.framework import Program, program_guard from paddle.fluid.framework import Program, program_guard
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.fluid.op import Operator from paddle.fluid.op import Operator
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
def run_pserver(pserver_id): def run_pserver(pserver_id):
......
...@@ -140,7 +140,7 @@ import paddle.fluid as fluid ...@@ -140,7 +140,7 @@ import paddle.fluid as fluid
from paddle.distributed.communicator import Communicator from paddle.distributed.communicator import Communicator
import paddle.fluid.incubate.fleet.base.role_maker as role_maker import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode from paddle.incubate.fleet.parameter_server.mode import DistributedMode
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
from test_communicator_geo import TestCommunicatorGeoEnd2End from test_communicator_geo import TestCommunicatorGeoEnd2End
......
...@@ -49,7 +49,7 @@ from .distribute_transpiler import ( ...@@ -49,7 +49,7 @@ from .distribute_transpiler import (
same_or_split_var, same_or_split_var,
ServerRuntimeConfig, ServerRuntimeConfig,
) )
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode from paddle.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.distributed.distribute_lookup_table import ( from paddle.distributed.distribute_lookup_table import (
find_distributed_lookup_table, find_distributed_lookup_table,
) )
......
...@@ -19,10 +19,10 @@ import warnings ...@@ -19,10 +19,10 @@ import warnings
from functools import reduce from functools import reduce
import paddle import paddle
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
from paddle.framework import core from paddle.framework import core
from paddle.incubate.fleet.parameter_server.ir import vars_metatools from paddle.incubate.fleet.parameter_server.ir import vars_metatools
from paddle.incubate.fleet.parameter_server.ir.ps_dispatcher import RoundRobin from paddle.incubate.fleet.parameter_server.ir.ps_dispatcher import RoundRobin
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
OP_NAME_SCOPE = "op_namescope" OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "gradient_clip" CLIP_OP_NAME_SCOPE = "gradient_clip"
......
...@@ -28,7 +28,7 @@ from . import ps_pb2 as pslib ...@@ -28,7 +28,7 @@ from . import ps_pb2 as pslib
from .node import DownpourServer, DownpourWorker from .node import DownpourServer, DownpourWorker
OpRole = core.op_proto_and_checker_maker.OpRole OpRole = core.op_proto_and_checker_maker.OpRole
# this dict is for store info about pull/push sparse ops. # this dict is for storing info about pull/push sparse ops.
FLEET_GLOBAL_DICT = { FLEET_GLOBAL_DICT = {
# global settings # global settings
"enable": False, "enable": False,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册