Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
4a3c4b8f
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4a3c4b8f
编写于
8月 23, 2019
作者:
Z
zhang wenhui
提交者:
GitHub
8月 23, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add fleet_desc config feature & multi_sparse table, test=develop (#18827)
add fleet_desc config feature & multi_sparse table,
上级
1799c257
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
451 addition
and
100 deletion
+451
-100
python/paddle/fluid/device_worker.py
python/paddle/fluid/device_worker.py
+23
-21
python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py
...e/fluid/incubate/fleet/parameter_server/pslib/__init__.py
+1
-1
python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py
...addle/fluid/incubate/fleet/parameter_server/pslib/node.py
+181
-62
python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py
...ncubate/fleet/parameter_server/pslib/optimizer_factory.py
+98
-16
python/paddle/fluid/tests/unittests/test_downpoursgd.py
python/paddle/fluid/tests/unittests/test_downpoursgd.py
+148
-0
未找到文件。
python/paddle/fluid/device_worker.py
浏览文件 @
4a3c4b8f
...
...
@@ -146,24 +146,26 @@ class DownpourSGD(DeviceWorker):
dense_table
.
dense_value_name
.
extend
(
i
.
dense_variable_name
)
dense_table
.
table_id
=
\
i
.
table_id
sparse_len
=
len
(
self
.
_fleet_desc
.
trainer_param
.
sparse_table
)
for
i
in
range
(
sparse_len
):
sparse_table
=
downpour
.
sparse_table
.
add
()
sparse_table
.
table_id
=
\
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
0
].
table_id
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
i
].
table_id
sparse_table
.
sparse_key_name
.
extend
(
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
0
].
slot_key
)
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
i
].
slot_key
)
sparse_table
.
sparse_value_name
.
extend
(
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
0
].
slot_value
)
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
i
].
slot_value
)
sparse_table
.
sparse_grad_name
.
extend
(
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
0
].
slot_gradient
)
self
.
_fleet_desc
.
trainer_param
.
sparse_table
[
i
].
slot_gradient
)
if
opt_info
[
"use_cvm"
]:
sparse_table
.
emb_dim
=
\
self
.
_fleet_desc
.
server_param
.
downpour_server_param
.
downpour_table_param
[
0
].
accessor
.
fea_dim
i
].
accessor
.
fea_dim
sparse_table
.
fea_dim
=
sparse_table
.
emb_dim
else
:
sparse_table
.
emb_dim
=
\
self
.
_fleet_desc
.
server_param
.
downpour_server_param
.
downpour_table_param
[
0
].
accessor
.
fea_dim
-
2
i
].
accessor
.
fea_dim
-
2
sparse_table
.
fea_dim
=
sparse_table
.
emb_dim
+
2
# TODO(guru4elephant): hard code here, need to improve
sparse_table
.
label_var_name
=
"click"
...
...
python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py
浏览文件 @
4a3c4b8f
...
...
@@ -13,7 +13,7 @@
import
os
import
sys
from
optimizer_factory
import
*
from
.
optimizer_factory
import
*
from
google.protobuf
import
text_format
import
paddle.fluid
as
fluid
from
paddle.fluid.framework
import
Program
...
...
python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py
浏览文件 @
4a3c4b8f
...
...
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import
ps_pb2
as
pslib
from
.
import
ps_pb2
as
pslib
class
Server
(
object
):
...
...
@@ -43,25 +43,21 @@ class DownpourServer(Server):
def
__init__
(
self
):
self
.
_server
=
pslib
.
ServerParameter
()
self
.
_server
.
downpour_server_param
.
service_param
.
start_server_port
=
0
self
.
_server
.
downpour_server_param
.
service_param
.
server_class
=
"DownpourBrpcPsServer"
self
.
_server
.
downpour_server_param
.
service_param
.
client_class
=
"DownpourBrpcPsClient"
self
.
_server
.
downpour_server_param
.
service_param
.
service_class
=
"DownpourPsService"
self
.
_server
.
downpour_server_param
.
service_param
.
start_server_port
=
0
self
.
_server
.
downpour_server_param
.
service_param
.
server_thread_num
=
12
def
add_sparse_table
(
self
,
table_id
,
learning_rate
,
slot_key_vars
,
slot_value_var
):
def
add_sparse_table
(
self
,
table_id
,
strategy
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
strategy(dict): the config dict.
Returns:
return None
"""
for
table
in
self
.
_server
.
downpour_server_param
.
downpour_table_param
:
if
table
.
table_id
==
table_id
:
if
table
.
type
==
pslib
.
PS_SPARSE_TABLE
:
...
...
@@ -69,38 +65,100 @@ class DownpourServer(Server):
else
:
raise
ValueError
(
"expect table %s type=%s, but actual type=%s"
\
%
(
table_id
,
pslib
.
PS_SPARSE_TABLE
,
table
.
type
))
if
strategy
is
None
:
strategy
=
dict
()
table
=
self
.
_server
.
downpour_server_param
.
downpour_table_param
.
add
()
table
.
table_id
=
table_id
table
.
table_class
=
"DownpourSparseTable"
table
.
type
=
pslib
.
PS_SPARSE_TABLE
table
.
compress_in_save
=
True
table
.
shard_num
=
1000
table
.
accessor
.
accessor_class
=
"DownpourCtrAccessor"
table
.
accessor
.
sparse_sgd_param
.
learning_rate
=
0.05
table
.
accessor
.
sparse_sgd_param
.
initial_g2sum
=
3
table
.
accessor
.
sparse_sgd_param
.
initial_range
=
1e-4
table
.
accessor
.
sparse_sgd_param
.
weight_bounds
.
extend
([
-
10
,
10
])
table
.
accessor
.
embedx_dim
=
8
table
.
accessor
.
embedx_threshold
=
10
table
.
accessor
.
fea_dim
=
11
table
.
accessor
.
downpour_accessor_param
.
nonclk_coeff
=
0.1
table
.
accessor
.
downpour_accessor_param
.
click_coeff
=
1
table
.
accessor
.
downpour_accessor_param
.
base_threshold
=
1.5
table
.
accessor
.
downpour_accessor_param
.
delta_threshold
=
0.25
table
.
accessor
.
downpour_accessor_param
.
delta_keep_days
=
30
table
.
accessor
.
downpour_accessor_param
.
delete_after_unseen_days
=
30
table
.
accessor
.
downpour_accessor_param
.
show_click_decay_rate
=
0.98
table
.
accessor
.
downpour_accessor_param
.
delete_threshold
=
0.8
def
add_dense_table
(
self
,
table_id
,
learning_rate
,
param_var
,
grad_var
):
support_sparse_key_list
=
[
'sparse_table_class'
,
'sparse_compress_in_save'
,
'sparse_shard_num'
,
\
'sparse_accessor_class'
,
'sparse_learning_rate'
,
'sparse_initial_g2sum'
,
'sparse_initial_range'
,
\
'sparse_weight_bounds'
,
'sparse_embedx_dim'
,
'sparse_embedx_threshold'
,
'sparse_nonclk_coeff'
,
\
'sparse_click_coeff'
,
'sparse_base_threshold'
,
'sparse_delta_threshold'
,
'sparse_delta_keep_days'
,
\
'sparse_show_click_decay_rate'
,
'sparse_delete_threshold'
]
for
key
in
strategy
:
if
key
not
in
support_sparse_key_list
:
raise
ValueError
(
"strategy key '%s' not support"
%
(
key
))
support_table_calss
=
[
'DownpourSparseTable'
]
if
strategy
.
get
(
'sparse_table_class'
)
is
not
None
:
table_class
=
strategy
.
get
(
'sparse_table_class'
)
if
table_class
not
in
support_table_calss
:
raise
ValueError
(
"support sparse_table_class: [ 'DownpourSparseTable' ],
\
but actual %s"
%
(
table_class
))
else
:
table_class
=
'DownpourSparseTable'
table
.
table_class
=
table_class
if
table_class
==
'DownpourSparseTable'
:
table
.
compress_in_save
=
strategy
.
get
(
'sparse_compress_in_save'
,
True
)
table
.
shard_num
=
strategy
.
get
(
'sparse_shard_num'
,
1000
)
support_accessor_class
=
[
'DownpourFeatureValueAccessor'
,
'DownpourCtrAccessor'
]
if
strategy
.
get
(
'sparse_accessor_class'
)
is
not
None
:
accessor_class
=
strategy
.
get
(
'sparse_accessor_class'
)
if
accessor_class
not
in
support_accessor_class
:
raise
ValueError
(
"support sparse_accessor_class: ['DownpourFeatureValueAccessor', 'DownpourCtrAccessor'],
\
but actual %s"
%
(
accessor_class
))
else
:
accessor_class
=
'DownpourCtrAccessor'
table
.
accessor
.
accessor_class
=
accessor_class
if
accessor_class
==
'DownpourFeatureValueAccessor'
or
accessor_class
==
'DownpourCtrAccessor'
:
table
.
accessor
.
sparse_sgd_param
.
learning_rate
=
strategy
.
get
(
'sparse_learning_rate'
,
0.05
)
table
.
accessor
.
sparse_sgd_param
.
initial_g2sum
=
strategy
.
get
(
'sparse_initial_g2sum'
,
3
)
table
.
accessor
.
sparse_sgd_param
.
initial_range
=
strategy
.
get
(
'sparse_initial_range'
,
1e-4
)
if
strategy
.
get
(
'sparse_weight_bounds'
)
is
None
:
table
.
accessor
.
sparse_sgd_param
.
weight_bounds
.
extend
(
[
-
10
,
10
])
else
:
table
.
accessor
.
sparse_sgd_param
.
weight_bounds
.
extend
(
strategy
.
get
(
'sparse_weight_bounds'
))
table
.
accessor
.
embedx_dim
=
strategy
.
get
(
'sparse_embedx_dim'
,
8
)
table
.
accessor
.
embedx_threshold
=
strategy
.
get
(
'sparse_embedx_threshold'
,
10
)
table
.
accessor
.
fea_dim
=
int
(
table
.
accessor
.
embedx_dim
)
+
3
table
.
accessor
.
downpour_accessor_param
.
nonclk_coeff
=
strategy
.
get
(
'sparse_nonclk_coeff'
,
0.1
)
table
.
accessor
.
downpour_accessor_param
.
click_coeff
=
strategy
.
get
(
'sparse_click_coeff'
,
1
)
table
.
accessor
.
downpour_accessor_param
.
base_threshold
=
strategy
.
get
(
'sparse_base_threshold'
,
1.5
)
table
.
accessor
.
downpour_accessor_param
.
delta_threshold
=
strategy
.
get
(
'sparse_delta_threshold'
,
0.25
)
table
.
accessor
.
downpour_accessor_param
.
delta_keep_days
=
strategy
.
get
(
'sparse_delta_keep_days'
,
16
)
table
.
accessor
.
downpour_accessor_param
.
delete_after_unseen_days
=
strategy
.
get
(
'sparse_delete_after_unseen_days'
,
30
)
table
.
accessor
.
downpour_accessor_param
.
show_click_decay_rate
=
strategy
.
get
(
'sparse_show_click_decay_rate'
,
0.98
)
table
.
accessor
.
downpour_accessor_param
.
delete_threshold
=
strategy
.
get
(
'sparse_delete_threshold'
,
0.8
)
table1
=
table
.
accessor
.
table_accessor_save_param
.
add
()
table1
.
param
=
1
table1
.
converter
=
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)"
table1
.
deconverter
=
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)"
table2
=
table
.
accessor
.
table_accessor_save_param
.
add
()
table2
.
param
=
2
table2
.
converter
=
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)"
table2
.
deconverter
=
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)"
def
add_dense_table
(
self
,
table_id
,
param_var
,
grad_var
,
strategy
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
strategy(dict): the dense config dict.
Returns:
return None
"""
...
...
@@ -117,29 +175,47 @@ class DownpourServer(Server):
else
:
raise
ValueError
(
"expect table %s type=%s, but actual type=%s"
\
%
(
table_id
,
pslib
.
PS_DENSE_TABLE
,
table
.
type
))
if
strategy
is
None
:
strategy
=
dict
()
table
=
self
.
_server
.
downpour_server_param
.
downpour_table_param
.
add
()
table
.
table_id
=
table_id
table
.
table_class
=
"DownpourDenseTable"
support_dense_key_list
=
[
'dense_table_class'
,
'dense_compress_in_save'
,
'dense_accessor_class'
,
\
'dense_optimizer'
,
'dense_learning_rate'
,
'dense_avg_decay'
,
'dense_ada_decay'
,
\
'dense_ada_epsilon'
,
'dense_mom_decay'
,
'dense_naive_lr'
]
for
key
in
strategy
:
if
key
not
in
support_dense_key_list
:
raise
ValueError
(
"strategy key '%s' not support"
%
(
key
))
table
.
table_class
=
strategy
.
get
(
'dense_table_class'
,
"DownpourDenseTable"
)
table
.
type
=
pslib
.
PS_DENSE_TABLE
table
.
compress_in_save
=
True
table
.
accessor
.
accessor_class
=
"DownpourDenseValueAccessor"
table
.
accessor
.
dense_sgd_param
.
name
=
"adam"
table
.
accessor
.
dense_sgd_param
.
adam
.
learning_rate
=
learning_rate
table
.
accessor
.
dense_sgd_param
.
adam
.
avg_decay_rate
=
0.999993
table
.
accessor
.
dense_sgd_param
.
adam
.
ada_decay_rate
=
0.9999
table
.
accessor
.
dense_sgd_param
.
adam
.
ada_epsilon
=
1e-8
table
.
accessor
.
dense_sgd_param
.
adam
.
mom_decay_rate
=
0.99
table
.
accessor
.
dense_sgd_param
.
naive
.
learning_rate
=
0.0002
table
.
compress_in_save
=
strategy
.
get
(
'dense_compress_in_save'
,
True
)
table
.
accessor
.
accessor_class
=
strategy
.
get
(
'dense_accessor_class'
,
"DownpourDenseValueAccessor"
)
table
.
accessor
.
dense_sgd_param
.
name
=
strategy
.
get
(
'dense_optimizer'
,
"adam"
)
table
.
accessor
.
dense_sgd_param
.
adam
.
learning_rate
=
strategy
.
get
(
'dense_learning_rate'
,
5e-06
)
table
.
accessor
.
dense_sgd_param
.
adam
.
avg_decay_rate
=
strategy
.
get
(
'dense_avg_decay'
,
0.999993
)
table
.
accessor
.
dense_sgd_param
.
adam
.
ada_decay_rate
=
strategy
.
get
(
'dense_ada_decay'
,
0.9999
)
table
.
accessor
.
dense_sgd_param
.
adam
.
ada_epsilon
=
strategy
.
get
(
'dense_ada_epsilon'
,
1e-8
)
table
.
accessor
.
dense_sgd_param
.
adam
.
mom_decay_rate
=
strategy
.
get
(
'dense_mom_decay'
,
0.99
)
table
.
accessor
.
dense_sgd_param
.
naive
.
learning_rate
=
strategy
.
get
(
'dense_naive_lr'
,
0.0002
)
table
.
accessor
.
fea_dim
=
fea_dim
def
add_data_norm_table
(
self
,
table_id
,
learning_rate
,
param_var
,
grad_var
):
def
add_data_norm_table
(
self
,
table_id
,
learning_rate
,
param_var
,
grad_var
,
strategy
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
table_id(int): id of datanorm table
strategy(dict): the datanorm config dict.
Returns:
return None
"""
...
...
@@ -156,14 +232,28 @@ class DownpourServer(Server):
else
:
raise
ValueError
(
"expect table %s type=%s, but actual type=%s"
\
%
(
table_id
,
pslib
.
PS_DENSE_TABLE
,
table
.
type
))
if
strategy
is
None
:
strategy
=
dict
()
support_datanorm_key_list
=
[
'datanorm_table_class'
,
'datanorm_compress_in_save'
,
\
'datanorm_accessor_class'
,
'datanorm_operation'
,
'datanorm_decay_rate'
]
for
key
in
strategy
:
if
key
not
in
support_datanorm_key_list
:
raise
ValueError
(
"strategy key '%s' not support"
%
(
key
))
table
=
self
.
_server
.
downpour_server_param
.
downpour_table_param
.
add
()
table
.
table_id
=
table_id
table
.
table_class
=
"DownpourDenseDoubleTable"
table
.
table_class
=
strategy
.
get
(
'datanorm_table_class'
,
"DownpourDenseDoubleTable"
)
table
.
type
=
pslib
.
PS_DENSE_TABLE
table
.
compress_in_save
=
True
table
.
accessor
.
accessor_class
=
"DownpourDenseValueDoubleAccessor"
table
.
accessor
.
dense_sgd_param
.
name
=
"summarydouble"
table
.
accessor
.
dense_sgd_param
.
summary
.
summary_decay_rate
=
0.999999
table
.
compress_in_save
=
strategy
.
get
(
'datanorm_compress_in_save'
,
True
)
table
.
accessor
.
accessor_class
=
strategy
.
get
(
'datanorm_accessor_class'
,
"DownpourDenseValueDoubleAccessor"
)
table
.
accessor
.
dense_sgd_param
.
name
=
strategy
.
get
(
'datanorm_operation'
,
"summarydouble"
)
table
.
accessor
.
dense_sgd_param
.
summary
.
summary_decay_rate
=
strategy
.
get
(
'datanorm_decay_rate'
,
0.999999
)
table
.
accessor
.
fea_dim
=
fea_dim
def
get_desc
(
self
):
...
...
@@ -187,13 +277,10 @@ class DownpourWorker(Worker):
self
.
window
=
window
self
.
_worker
=
pslib
.
DownpourTrainerParameter
()
def
add_sparse_table
(
self
,
table_id
,
learning_rate
,
slot_key_vars
,
slot_value_vars
):
def
add_sparse_table
(
self
,
table_id
,
slot_key_vars
,
slot_value_vars
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
...
...
@@ -201,7 +288,26 @@ class DownpourWorker(Worker):
"""
for
table
in
self
.
_worker
.
sparse_table
:
if
table
.
table_id
==
table_id
:
if
[
var
.
name
for
var
in
slot_key_vars
]
==
self
.
_worker
.
sparse_table
[
table_id
].
slot_key
:
if
[
var
.
name
for
var
in
slot_value_vars
]
==
self
.
_worker
.
sparse_table
[
table_id
].
slot_value
:
if
[
var
.
name
+
"@GRAD"
for
var
in
slot_value_vars
]
==
self
.
_worker
.
sparse_table
[
table_id
].
slot_gradient
:
return
else
:
raise
ValueError
(
"sparse table %s slot_gradient error"
%
table_id
)
else
:
raise
ValueError
(
"sparse table %s slot_value error"
%
table_id
)
else
:
raise
ValueError
(
"sparse table %s slot_key error"
%
table_id
)
table
=
self
.
_worker
.
sparse_table
.
add
()
table
.
table_id
=
table_id
table
.
slot_key
.
extend
([
var
.
name
for
var
in
slot_key_vars
])
...
...
@@ -209,7 +315,8 @@ class DownpourWorker(Worker):
table
.
slot_gradient
.
extend
(
[
var
.
name
+
"@GRAD"
for
var
in
slot_value_vars
])
def
add_dense_table
(
self
,
table_id
,
learning_rate
,
param_vars
,
grad_vars
):
def
add_dense_table
(
self
,
table_id
,
learning_rate
,
param_vars
,
grad_vars
,
dense_start_table_id
):
"""
Args:
table_id(int): id of sparse params table
...
...
@@ -222,7 +329,19 @@ class DownpourWorker(Worker):
"""
for
table
in
self
.
_worker
.
dense_table
:
if
table
.
table_id
==
table_id
:
if
filter
(
lambda
x
:
x
.
find
(
"embedding"
)
==
-
1
,
[
p
.
name
for
p
in
param_vars
])
==
\
self
.
_worker
.
dense_table
[
table_id
-
dense_start_table_id
].
dense_variable_name
:
if
filter
(
lambda
x
:
x
.
find
(
"embedding"
)
==
-
1
,
[
g
.
name
for
g
in
grad_vars
])
==
\
self
.
_worker
.
dense_table
[
table_id
-
dense_start_table_id
].
dense_gradient_variable_name
:
return
else
:
raise
ValueError
(
"dense table %s dense_gradient_variable_name error"
%
table_id
)
else
:
raise
ValueError
(
"dense table %s dense_variable_name error"
%
table_id
)
table
=
self
.
_worker
.
dense_table
.
add
()
table
.
table_id
=
table_id
table
.
dense_variable_name
.
extend
(
...
...
python/paddle/fluid/incubate/fleet/parameter_server/pslib/optimizer_factory.py
浏览文件 @
4a3c4b8f
...
...
@@ -13,13 +13,13 @@
# limitations under the License.
__all__
=
[
"DistributedAdam"
]
import
ps_pb2
as
pslib
import
paddle.fluid
as
fluid
from
paddle.fluid.distribute_lookup_table
import
find_distributed_lookup_table
from
paddle.fluid.distribute_lookup_table
import
find_distributed_lookup_table_inputs
from
paddle.fluid.distribute_lookup_table
import
find_distributed_lookup_table_outputs
from
google.protobuf
import
text_format
from
.node
import
DownpourWorker
,
DownpourServer
from
.
import
ps_pb2
as
pslib
class
DistributedOptimizerImplBase
(
object
):
...
...
@@ -48,6 +48,63 @@ class DistributedAdam(DistributedOptimizerImplBase):
".batch_size@GRAD"
,
".batch_square_sum@GRAD"
,
".batch_sum@GRAD"
]
def
_find_distributed_lookup_table_inputs
(
self
,
program
,
table_names
):
"""
Find input variable of distribute lookup table in program.
We could support multi-distribute table now.
Args:
program(Program): given program, locate distributed lookup table
table_name(str): given table names that is found beforehand
Returns:
inputs
"""
local_vars
=
program
.
current_block
().
vars
inputs_dict
=
dict
()
for
table_name
in
table_names
:
inputs_dict
[
table_name
]
=
[]
for
op
in
program
.
global_block
().
ops
:
if
op
.
type
==
"lookup_table"
:
if
op
.
input
(
"W"
)[
0
]
in
table_names
:
inputs_dict
[
op
.
input
(
"W"
)[
0
]].
extend
(
[
local_vars
[
name
]
for
name
in
op
.
input
(
"Ids"
)])
return
inputs_dict
def
_find_distributed_lookup_table_outputs
(
self
,
program
,
table_names
):
"""
Find output variable of distribute lookup table in program.
We could support multi-distribute table now.
Args:
program(Program): given program, locate distributed lookup table
table_name(str): given table name that is found beforehand
Returns:
outputs
"""
local_vars
=
program
.
current_block
().
vars
outputs_dict
=
dict
()
for
table_name
in
table_names
:
outputs_dict
[
table_name
]
=
[]
for
op
in
program
.
global_block
().
ops
:
if
op
.
type
==
"lookup_table"
:
if
op
.
input
(
"W"
)[
0
]
in
table_names
:
outputs_dict
[
op
.
input
(
"W"
)[
0
]].
extend
(
[
local_vars
[
name
]
for
name
in
op
.
output
(
"Out"
)])
return
outputs_dict
def
_find_multi_distributed_lookup_table
(
self
,
losses
):
"""
find multi-sparse-table
"""
table_names
=
set
()
for
loss
in
losses
:
for
op
in
loss
.
block
.
program
.
global_block
().
ops
:
if
op
.
type
==
"lookup_table"
:
if
op
.
attr
(
'is_distributed'
)
is
True
:
table_name
=
op
.
input
(
"W"
)[
0
]
table_names
.
add
(
table_name
)
return
list
(
table_names
)
def
_minimize
(
self
,
losses
,
startup_program
=
None
,
...
...
@@ -69,10 +126,15 @@ class DistributedAdam(DistributedOptimizerImplBase):
[optimize_ops, grads_and_weights]
"""
table_name
=
find_distributed_lookup_table
(
losses
[
0
].
block
.
program
)
table_name
=
self
.
_find_multi_distributed_lookup_table
(
losses
)
prefetch_slots
=
find_distributed_lookup_table_inputs
(
losses
[
0
].
block
.
program
,
table_name
[
0
])
inputs_dict
=
self
.
_find_distributed_lookup_table_inputs
(
losses
[
0
].
block
.
program
,
table_name
)
prefetch_slots_emb
=
find_distributed_lookup_table_outputs
(
losses
[
0
].
block
.
program
,
table_name
[
0
])
outputs_dict
=
self
.
_find_distributed_lookup_table_outputs
(
losses
[
0
].
block
.
program
,
table_name
)
ps_param
=
pslib
.
PSParameter
()
...
...
@@ -87,20 +149,29 @@ class DistributedAdam(DistributedOptimizerImplBase):
text_format
.
Merge
(
f
.
read
(),
ps_param
)
server
.
get_desc
().
CopyFrom
(
ps_param
.
server_param
)
worker
.
get_desc
().
CopyFrom
(
ps_param
.
trainer_param
)
sparse_table_index
=
0
server
.
add_sparse_table
(
sparse_table_index
,
self
.
_learning_rate
,
prefetch_slots
,
prefetch_slots_emb
)
worker
.
add_sparse_table
(
sparse_table_index
,
self
.
_learning_rate
,
prefetch_slots
,
prefetch_slots_emb
)
dense_table_index
=
1
for
tn
in
table_name
:
if
strategy
.
get
(
tn
)
is
not
None
:
server
.
add_sparse_table
(
sparse_table_index
,
strategy
[
tn
])
else
:
server
.
add_sparse_table
(
sparse_table_index
,
None
)
worker
.
add_sparse_table
(
sparse_table_index
,
inputs_dict
[
tn
],
outputs_dict
[
tn
])
sparse_table_index
+=
1
dense_start_table_id
=
sparse_table_index
dense_table_index
=
sparse_table_index
program_configs
=
{}
param_grads_list
=
[]
for
loss_index
in
range
(
len
(
losses
)):
program_id
=
str
(
id
(
losses
[
loss_index
].
block
.
program
))
program_configs
[
program_id
]
=
{
"pull_sparse"
:
[
sparse_table_index
],
"push_sparse"
:
[
sparse_table_index
]
"pull_sparse"
:
[
t_index
for
t_index
in
range
(
sparse_table_index
)],
"push_sparse"
:
[
t_index
for
t_index
in
range
(
sparse_table_index
)]
}
params_grads
=
sorted
(
...
...
@@ -128,19 +199,30 @@ class DistributedAdam(DistributedOptimizerImplBase):
data_norm_grads
.
append
(
i
[
1
])
if
not
is_data_norm_data
:
grads
.
append
(
i
[
1
])
server
.
add_dense_table
(
dense_table_index
,
self
.
_learning_rate
,
params
,
grads
)
if
strategy
.
get
(
'dense_table'
)
is
not
None
:
server
.
add_dense_table
(
dense_table_index
,
params
,
grads
,
strategy
[
'dense_table'
])
else
:
server
.
add_dense_table
(
dense_table_index
,
params
,
grads
,
None
)
worker
.
add_dense_table
(
dense_table_index
,
self
.
_learning_rate
,
params
,
grads
)
params
,
grads
,
dense_start_table_id
)
program_configs
[
program_id
][
"pull_dense"
]
=
[
dense_table_index
]
program_configs
[
program_id
][
"push_dense"
]
=
[
dense_table_index
]
if
len
(
data_norm_params
)
!=
0
and
len
(
data_norm_grads
)
!=
0
:
dense_table_index
+=
1
server
.
add_data_norm_table
(
dense_table_index
,
self
.
_learning_rate
,
data_norm_params
,
data_norm_grads
)
if
strategy
.
get
(
'datanorm_table'
)
is
not
None
:
server
.
add_data_norm_table
(
dense_table_index
,
self
.
_learning_rate
,
data_norm_params
,
data_norm_grads
,
strategy
[
'datanorm_table'
])
else
:
server
.
add_data_norm_table
(
dense_table_index
,
self
.
_learning_rate
,
data_norm_params
,
data_norm_grads
,
None
)
worker
.
add_dense_table
(
dense_table_index
,
self
.
_learning_rate
,
data_norm_params
,
data_norm_grads
)
data_norm_params
,
data_norm_grads
,
dense_start_table_id
)
program_configs
[
program_id
][
"pull_dense"
].
extend
(
[
dense_table_index
])
program_configs
[
program_id
][
"push_dense"
].
extend
(
...
...
python/paddle/fluid/tests/unittests/test_downpoursgd.py
0 → 100644
浏览文件 @
4a3c4b8f
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle
import
paddle.fluid
as
fluid
import
os
import
signal
import
subprocess
import
time
import
unittest
import
sys
from
op_test
import
OpTest
from
paddle.fluid.trainer_desc
import
DistMultiTrainer
from
paddle.fluid.device_worker
import
DownpourSGD
from
google.protobuf
import
text_format
import
paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2
as
pslib
class
TestListenAndServOp
(
OpTest
):
def
setUp
(
self
):
pass
def
test_device_work_use_cvm
(
self
):
if
sys
.
platform
==
'win32'
or
sys
.
platform
==
'sys.platform'
:
pass
else
:
print
(
sys
.
platform
)
cmd
=
"wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt"
os
.
system
(
cmd
)
x
=
fluid
.
layers
.
data
(
name
=
'x'
,
shape
=
[
1
],
dtype
=
'float32'
)
x_emb
=
fluid
.
layers
.
embedding
(
input
=
x
,
size
=
[
1
,
2
],
is_distributed
=
True
)
y_predict
=
fluid
.
layers
.
fc
(
input
=
x_emb
,
size
=
1
,
act
=
None
)
y
=
fluid
.
layers
.
data
(
name
=
'y'
,
shape
=
[
1
],
dtype
=
'float32'
)
cost
=
fluid
.
layers
.
square_error_cost
(
input
=
y_predict
,
label
=
y
)
avg_cost
=
fluid
.
layers
.
mean
(
cost
)
ps_param
=
pslib
.
PSParameter
()
with
open
(
"fleet_desc.prototxt"
)
as
f
:
text_format
.
Merge
(
f
.
read
(),
ps_param
)
fleet_desc
=
ps_param
exe
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
exe
.
run
(
fluid
.
default_startup_program
())
opt_info
=
{}
main_program
=
fluid
.
default_main_program
()
program_id
=
str
(
id
(
avg_cost
.
block
.
program
))
program_configs
=
{}
program_configs
[
program_id
]
=
{
"pull_sparse"
:
[
0
],
"push_sparse"
:
[
0
]
}
program_configs
[
program_id
][
"pull_dense"
]
=
[
1
]
program_configs
[
program_id
][
"push_dense"
]
=
[
1
]
worker_skipped_ops
=
[
"lookup_table"
,
"lookup_table_grad"
]
opt_info
[
"program_configs"
]
=
program_configs
opt_info
[
"trainer"
]
=
"DistMultiTrainer"
opt_info
[
"device_worker"
]
=
"DownpourSGD"
opt_info
[
"optimizer"
]
=
"DownpourSGD"
opt_info
[
"fleet_desc"
]
=
ps_param
opt_info
[
"worker_skipped_ops"
]
=
worker_skipped_ops
opt_info
[
"use_cvm"
]
=
True
opt_info
[
"scale_datanorm"
]
=
-
1
opt_info
[
"dump_slot"
]
=
False
main_program
.
_fleet_opt
=
opt_info
trainer
=
DistMultiTrainer
()
trainer
.
_set_program
(
main_program
)
device_worker
=
DownpourSGD
()
device_worker
.
_set_fleet_desc
(
fleet_desc
)
trainer
.
_set_device_worker
(
device_worker
)
trainer
.
_set_fleet_desc
(
fleet_desc
)
trainer
.
_gen_trainer_desc
()
cmd
=
"rm fleet_desc.prototxt*"
os
.
system
(
cmd
)
def
test_device_work
(
self
):
if
sys
.
platform
==
'win32'
or
sys
.
platform
==
'sys.platform'
:
pass
else
:
print
(
sys
.
platform
)
cmd
=
"wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt"
os
.
system
(
cmd
)
x
=
fluid
.
layers
.
data
(
name
=
'x'
,
shape
=
[
1
],
dtype
=
'float32'
)
x_emb
=
fluid
.
layers
.
embedding
(
input
=
x
,
size
=
[
1
,
2
],
is_distributed
=
True
)
y_predict
=
fluid
.
layers
.
fc
(
input
=
x_emb
,
size
=
1
,
act
=
None
)
y
=
fluid
.
layers
.
data
(
name
=
'y'
,
shape
=
[
1
],
dtype
=
'float32'
)
cost
=
fluid
.
layers
.
square_error_cost
(
input
=
y_predict
,
label
=
y
)
avg_cost
=
fluid
.
layers
.
mean
(
cost
)
ps_param
=
pslib
.
PSParameter
()
with
open
(
"fleet_desc.prototxt"
)
as
f
:
text_format
.
Merge
(
f
.
read
(),
ps_param
)
fleet_desc
=
ps_param
exe
=
fluid
.
Executor
(
fluid
.
CPUPlace
())
exe
.
run
(
fluid
.
default_startup_program
())
opt_info
=
{}
main_program
=
fluid
.
default_main_program
()
program_id
=
str
(
id
(
avg_cost
.
block
.
program
))
program_configs
=
{}
program_configs
[
program_id
]
=
{
"pull_sparse"
:
[
0
],
"push_sparse"
:
[
0
]
}
program_configs
[
program_id
][
"pull_dense"
]
=
[
1
]
program_configs
[
program_id
][
"push_dense"
]
=
[
1
]
worker_skipped_ops
=
[
"lookup_table"
,
"lookup_table_grad"
]
opt_info
[
"program_configs"
]
=
program_configs
opt_info
[
"trainer"
]
=
"DistMultiTrainer"
opt_info
[
"device_worker"
]
=
"DownpourSGD"
opt_info
[
"optimizer"
]
=
"DownpourSGD"
opt_info
[
"fleet_desc"
]
=
ps_param
opt_info
[
"worker_skipped_ops"
]
=
worker_skipped_ops
opt_info
[
"use_cvm"
]
=
False
opt_info
[
"scale_datanorm"
]
=
-
1
opt_info
[
"dump_slot"
]
=
False
main_program
.
_fleet_opt
=
opt_info
trainer
=
DistMultiTrainer
()
trainer
.
_set_program
(
main_program
)
device_worker
=
DownpourSGD
()
device_worker
.
_set_fleet_desc
(
fleet_desc
)
trainer
.
_set_device_worker
(
device_worker
)
trainer
.
_set_fleet_desc
(
fleet_desc
)
trainer
.
_gen_trainer_desc
()
cmd
=
"rm fleet_desc.prototxt*"
os
.
system
(
cmd
)
if
__name__
==
"__main__"
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录