Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
3641a78b
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3641a78b
编写于
3月 13, 2019
作者:
D
dongdaxiang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add incubate for unified API
上级
e657c127
变更
9
展开全部
隐藏空白更改
内联
并排
Showing
9 changed file
with
3091 addition
and
5 deletion
+3091
-5
python/paddle/fluid/executor.py
python/paddle/fluid/executor.py
+5
-5
python/paddle/fluid/incubate/fleet/__init__.py
python/paddle/fluid/incubate/fleet/__init__.py
+14
-0
python/paddle/fluid/incubate/fleet/base/__init__.py
python/paddle/fluid/incubate/fleet/base/__init__.py
+12
-0
python/paddle/fluid/incubate/fleet/base/role_maker.py
python/paddle/fluid/incubate/fleet/base/role_maker.py
+119
-0
python/paddle/fluid/incubate/fleet/p2p/__init__.py
python/paddle/fluid/incubate/fleet/p2p/__init__.py
+12
-0
python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
.../paddle/fluid/incubate/fleet/parameter_server/__init__.py
+145
-0
python/paddle/fluid/incubate/fleet/parameter_server/node.py
python/paddle/fluid/incubate/fleet/parameter_server/node.py
+203
-0
python/paddle/fluid/incubate/fleet/parameter_server/optimizer_factory.py
...luid/incubate/fleet/parameter_server/optimizer_factory.py
+155
-0
python/paddle/fluid/incubate/fleet/parameter_server/ps_pb2.py
...on/paddle/fluid/incubate/fleet/parameter_server/ps_pb2.py
+2426
-0
未找到文件。
python/paddle/fluid/executor.py
浏览文件 @
3641a78b
...
...
@@ -644,8 +644,8 @@ class Executor(object):
trainer
.
set_thread
(
dataset
.
thread_num
)
else
:
trainer
.
set_thread
(
thread
)
trainer
.
gen_trainer_desc
()
dataset
.
_prepare_to_run
()
self
.
_default_executor
.
run_from_dataset
(
program
.
desc
,
scope
,
dataset
.
dataset
,
trainer
.
_desc
())
trainer
.
gen_trainer_desc
()
dataset
.
_prepare_to_run
()
self
.
_default_executor
.
run_from_dataset
(
program
.
desc
,
scope
,
dataset
.
dataset
,
trainer
.
_desc
())
python/paddle/fluid/incubate/fleet/__init__.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
__version__
=
'0.1.0'
python/paddle/fluid/incubate/fleet/base/__init__.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
python/paddle/fluid/incubate/fleet/base/role_maker.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
.helper
import
MPIHelper
class
RoleMakerBase
(
object
):
def
__init__
(
self
):
self
.
role_maker_name_
=
""
self
.
trainer_endpoints_
=
[]
self
.
pserver_endpoints_
=
[]
def
is_worker
(
self
):
raise
NotImplementedError
(
"Please implement this method in child class"
)
def
is_server
(
self
):
raise
NotImplementedError
(
"Please implement this method in child class"
)
def
get_local_ip
(
self
):
import
socket
self
.
ip_
=
socket
.
gethostbyname
(
socket
.
gethostname
())
return
self
.
ip_
def
get_trainer_endpoints
(
self
):
return
self
.
trainer_endpoints_
def
get_pserver_endpoints
(
self
):
return
self
.
pserver_endpoints_
def
generate_role
(
self
):
raise
NotImplementedError
(
"Please implement this method in child class"
)
class
MPIRoleMaker
(
RoleMakerBase
):
def
__init__
(
self
):
from
mpi4py
import
MPI
self
.
comm_
=
MPI
.
COMM_WORLD
self
.
MPI
=
MPI
def
get_rank
(
self
):
self
.
rank_
=
self
.
comm_
.
Get_rank
()
return
self
.
rank_
def
get_size
(
self
):
self
.
size_
=
self
.
comm_
.
Get_size
()
return
self
.
size_
def
all_gather
(
self
,
obj
):
self
.
barrier_all
()
return
self
.
comm_
.
allgather
(
obj
)
def
barrier_all
(
self
):
self
.
comm_
.
barrier
()
def
get_ips
(
self
):
if
self
.
ips_
==
None
:
self
.
ips_
=
self
.
comm_
.
allgather
(
self
.
get_local_ip
())
return
self
.
ips_
def
finalize
(
self
):
self
.
comm_
.
finalize
()
class
MPISymetricRoleMaker
(
MPIRoleMaker
):
def
__init__
(
self
):
super
(
MPISymetricRoleMaker
,
self
).
__init__
()
self
.
node_type_
=
None
self
.
proc_per_node_
=
2
def
is_first_worker
(
self
):
return
self
.
is_worker
()
and
0
==
self
.
worker_index
()
def
is_worker
(
self
):
return
self
.
node_type_
==
1
def
is_server
(
self
):
return
self
.
node_type_
==
0
def
worker_num
(
self
):
if
self
.
is_worker
():
return
self
.
get_size
()
def
server_num
(
self
):
if
self
.
is_server
():
return
self
.
get_size
()
def
worker_index
(
self
):
return
self
.
rank
/
self
.
proc_per_node_
def
server_index
(
self
):
return
self
.
rank
/
self
.
proc_per_node_
def
barrier_worker
(
self
):
if
self
.
is_worker
():
self
.
node_type_comm_
.
barrier
()
def
barrier_server
(
self
):
if
self
.
is_server
():
self
.
node_type_comm_
.
barrier
()
def
generate_role
(
self
):
self
.
trainer_endpoints_
=
self
.
get_ips
()
self
.
pserver_endpoints_
=
self
.
get_ips
()
if
0
==
self
.
get_rank
()
%
self
.
proc_per_node_
%
2
:
self
.
node_type_
=
0
else
:
self
.
node_type_
=
1
self
.
node_type_comm_
=
self
.
comm_
.
Split
(
self
.
node_type_
)
python/paddle/fluid/incubate/fleet/p2p/__init__.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import
sys
import
os
from
..base.role_maker
import
MPISymetricRoleMaker
from
paddle.fluid.optimizer
import
Optimizer
# this is a temporary solution
# TODO(guru4elephant)
# will make this more flexible for more Parameter Server Archs
fleet_instance
=
Fleet
()
init
=
fleet_instance
.
init
stop
=
fleet_instance
.
stop
init_pserver
=
fleet_instance
.
init_pserver
init_worker
=
fleet_instance
.
init_worker
init_pserver_model
=
fleet_instance
.
init_pserver_model
save_pserver_model
=
fleet_instance
.
save_pserver_model
class
Fleet
(
object
):
"""
"""
def
__init__
(
self
):
self
.
opt_info
=
None
# for fleet only
self
.
role_maker_
=
None
def
init
(
self
):
# TODO(guru4elephant)
# this is a temporary solution
# we will support more configurable RoleMaker for users in the future
self
.
role_maker_
=
MPISymetricRoleMaker
()
self
.
role_maker_
.
generate_role
()
self
.
_fleet_ptr
=
core
.
FleetWrapper
()
def
stop
(
self
):
self
.
role_maker_
.
barrier_worker
()
if
self
.
role_maker_
.
is_first_worker
():
self
.
_fleet_ptr
.
stop_server
()
self
.
role_maker_
.
barrier_worker
()
self
.
role_maker_
.
barrier_all
()
self
.
role_maker_
.
finalize
()
def
init_pserver
(
self
):
if
self
.
_opt_info
:
if
"fleet_desc"
in
self
.
_opt_info
:
self
.
_dist_desc_str
=
text_format
.
MessageToString
(
self
.
_opt_info
[
"fleet_desc"
])
self
.
_dist_desc
=
self
.
_opt_info
[
"fleet_desc"
]
else
:
print
(
"You should run DistributedOptimizer.minimize() first"
)
sys
.
exit
(
-
1
)
self
.
_fleet_ptr
.
init_server
(
self
.
_dist_desc_str
)
ip
=
self
.
_fleet_ptr
.
start_server
()
ips
=
self
.
role_maker_
.
all_gather
(
ip
)
self
.
_fleet_ptr
.
gather_servers
(
ips
,
self
.
role_maker_
.
get_size
())
self
.
role_maker_
.
barrier_all
()
else
:
print
(
"You should run DistributedOptimizer.minimize() first"
)
sys
.
exit
(
-
1
)
def
init_worker
(
self
):
if
self
.
_opt_info
:
if
"fleet_desc"
in
self
.
_opt_info
:
self
.
_dist_desc_str
=
text_format
.
MessageToString
(
self
.
_opt_info
[
"fleet_desc"
])
self
.
_dist_desc
=
self
.
_opt_info
[
"fleet_desc"
]
else
:
print
(
"You should run DistributedOptimizer.minimize() first"
)
sys
.
exit
(
-
1
)
self
.
role_maker_
.
barrier_all
()
self
.
_fleet_ptr
.
init_work
(
self
.
dist_desc_str_
,
self
.
role_maker
.
get_ips
(),
self
.
role_maker_
.
get_size
(),
self
.
role_maker_
.
get_rank
())
self
.
role_maker_
.
barrier_worker
()
else
:
print
(
"You should run DistributedOptimizer.minimize() first"
)
sys
.
exit
(
-
1
)
def
init_pserver_model
(
self
):
if
self
.
role_maker_
.
is_first_worker
():
self
.
_fleet_ptr
.
init_model
()
self
.
role_maker_
.
barrier_worker
()
def
save_pserver_model
(
self
,
save_path
):
self
.
_fleet_ptr
.
save_model
(
save_path
)
def
_set_opt_info
(
self
,
opt_info
):
self
.
_opt_info
=
opt_info
class
DistributedOptimizer
(
paddle
.
fluid
.
Optimizer
):
def
__init__
(
self
,
optimizer
,
dist_config
=
{}):
super
(
DistributedOptimizer
,
self
).
__init__
()
self
.
_optimizer
=
optimizer
self
.
_optimizer_name
=
"Distributed%s"
%
optimizer
.
type
.
capitalize
()
if
optimizer
.
type
!=
"adam"
:
print
(
"Currently, distributed optimizer only supports Adam"
"Will config built-in adam for you."
"We will support more functions in DistributedOptimizer"
,
sys
.
stderr
)
self
.
_optimizer_name
=
"DistributedAdam"
self
.
_distributed_optimizer
=
globals
()[
self
.
_optimizer_name
]()
def
backward
(
self
,
loss
,
startup_program
=
None
,
parameter_list
=
None
,
no_grad_set
=
None
,
callbacks
=
None
):
pass
def
apply_gradients
(
self
,
params_grads
):
pass
def
minimize
(
self
,
loss
,
startup_program
=
None
,
parameter_list
=
None
,
no_grad_set
=
None
):
optimize_ops
,
param_grads
,
opt_info
=
\
self
.
_distributed_optimizer
.
minimize
(
self
.
_optimizer
,
loss
,
startup_program
,
parameter_list
,
no_grad_set
)
fleet_instance
.
_set_opt_info
(
opt_info
)
return
[
a
,
b
]
python/paddle/fluid/incubate/fleet/parameter_server/node.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import
ps_pb2
as
pslib
class
Server
(
object
):
"""
A Server basic class.
"""
def
__init__
(
self
):
pass
class
Worker
(
object
):
"""
A Worker basic class.
"""
def
__init__
(
self
):
pass
class
DownpourServer
(
Server
):
"""
DownpourServer class is used to generate server program_desc
Args:
server: it is pslib.ServerParameter()
Examples:
server = DownpourServer()
"""
def
__init__
(
self
):
self
.
server_
=
pslib
.
ServerParameter
()
self
.
server_
.
downpour_server_param
.
service_param
.
start_server_port
=
0
self
.
server_
.
downpour_server_param
.
service_param
.
server_class
=
"DownpourBrpcPsServer"
self
.
server_
.
downpour_server_param
.
service_param
.
client_class
=
"DownpourBrpcPsClient"
self
.
server_
.
downpour_server_param
.
service_param
.
service_class
=
"DownpourPsService"
self
.
server_
.
downpour_server_param
.
service_param
.
start_server_port
=
0
self
.
server_
.
downpour_server_param
.
service_param
.
server_thread_num
=
12
def
add_sparse_table
(
self
,
table_id
,
learning_rate
,
slot_key_vars
,
slot_value_var
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
return None
"""
table
=
self
.
server_
.
downpour_server_param
.
downpour_table_param
.
add
()
table
.
table_id
=
table_id
table
.
table_class
=
"DownpourSparseTable"
table
.
type
=
pslib
.
PS_SPARSE_TABLE
table
.
accessor
.
accessor_class
=
"DownpourFeatureValueAccessor"
table
.
accessor
.
sparse_sgd_param
.
learning_rate
=
learning_rate
table
.
accessor
.
sparse_sgd_param
.
initial_g2sum
=
3
table
.
accessor
.
sparse_sgd_param
.
initial_range
=
1e-4
table
.
accessor
.
sparse_sgd_param
.
weight_bounds
.
extend
([
-
10
,
10
])
table
.
accessor
.
embedx_dim
=
8
table
.
accessor
.
embedx_threshold
=
5
table
.
accessor
.
fea_dim
=
11
table
.
accessor
.
downpour_accessor_param
.
nonclk_coeff
=
0.1
table
.
accessor
.
downpour_accessor_param
.
click_coeff
=
2
table
.
accessor
.
downpour_accessor_param
.
base_threshold
=
0.2
table
.
accessor
.
downpour_accessor_param
.
delta_threshold
=
0.15
table
.
accessor
.
downpour_accessor_param
.
delta_keep_days
=
31
table
.
accessor
.
downpour_accessor_param
.
show_click_decay_rate
=
0.999
table
.
accessor
.
downpour_accessor_param
.
delete_threshold
=
0.8
def
add_dense_table
(
self
,
table_id
,
learning_rate
,
param_var
,
grad_var
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table
=
self
.
server_
.
downpour_server_param
.
downpour_table_param
.
add
()
table
.
table_id
=
table_id
table
.
table_class
=
"DownpourDenseTable"
table
.
type
=
pslib
.
PS_DENSE_TABLE
table
.
accessor
.
accessor_class
=
"DownpourDenseValueAccessor"
table
.
accessor
.
dense_sgd_param
.
name
=
"adam"
table
.
accessor
.
dense_sgd_param
.
adam
.
learning_rate
=
learning_rate
table
.
accessor
.
dense_sgd_param
.
adam
.
avg_decay_rate
=
0.999993
table
.
accessor
.
dense_sgd_param
.
adam
.
ada_decay_rate
=
0.9999
table
.
accessor
.
dense_sgd_param
.
adam
.
ada_epsilon
=
1e-8
table
.
accessor
.
dense_sgd_param
.
adam
.
mom_decay_rate
=
0.99
table
.
accessor
.
dense_sgd_param
.
naive
.
learning_rate
=
0.0002
fea_dim
=
0
for
param
in
filter
(
lambda
x
:
x
.
name
.
find
(
"embedding"
)
==
-
1
,
param_var
):
fea_dim
+=
reduce
(
lambda
x
,
y
:
x
*
y
,
param
.
shape
,
1
)
table
.
accessor
.
fea_dim
=
fea_dim
def
add_data_norm_table
(
self
,
table_id
,
learning_rate
,
param_var
,
grad_var
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table
=
self
.
server_
.
downpour_server_param
.
downpour_table_param
.
add
()
table
.
table_id
=
table_id
table
.
table_class
=
"DownpourDenseTable"
table
.
type
=
pslib
.
PS_DENSE_TABLE
table
.
accessor
.
accessor_class
=
"DownpourDenseValueAccessor"
table
.
accessor
.
dense_sgd_param
.
name
=
"summary"
table
.
accessor
.
dense_sgd_param
.
summary
.
summary_decay_rate
=
0.999999
fea_dim
=
0
for
param
in
filter
(
lambda
x
:
x
.
name
.
find
(
"embedding"
)
==
-
1
,
param_var
):
fea_dim
+=
reduce
(
lambda
x
,
y
:
x
*
y
,
param
.
shape
,
1
)
table
.
accessor
.
fea_dim
=
fea_dim
def
get_desc
(
self
):
"""
Return downpour server program_desc
"""
return
self
.
server_
class
DownpourWorker
(
Worker
):
"""
DownpourWorker class is used to generate worker program_desc
Args:
window (int): push params frequency
worker: it is pslib.DownpourTrainerParameter
Examples:
worker = DownpourWorker(1)
"""
def
__init__
(
self
,
window
):
self
.
window
=
window
self
.
worker_
=
pslib
.
DownpourTrainerParameter
()
def
add_sparse_table
(
self
,
table_id
,
learning_rate
,
slot_key_vars
,
slot_value_vars
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
return None
"""
table
=
self
.
worker_
.
sparse_table
.
add
()
table
.
table_id
=
table_id
table
.
slot_key
.
extend
([
var
.
name
for
var
in
slot_key_vars
])
table
.
slot_value
.
extend
([
var
.
name
for
var
in
slot_value_vars
])
table
.
slot_gradient
.
extend
(
[
var
.
name
+
"@GRAD"
for
var
in
slot_value_vars
])
def
add_dense_table
(
self
,
table_id
,
learning_rate
,
param_vars
,
grad_vars
):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters.
\
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table
=
self
.
worker_
.
dense_table
.
add
()
table
.
table_id
=
table_id
table
.
dense_variable_name
.
extend
(
filter
(
lambda
x
:
x
.
find
(
"embedding"
)
==
-
1
,
[
p
.
name
for
p
in
param_vars
]))
table
.
dense_gradient_variable_name
.
extend
(
filter
(
lambda
x
:
x
.
find
(
"embedding"
)
==
-
1
,
[
g
.
name
for
g
in
grad_vars
]))
def
get_desc
(
self
):
"""
Return downpour worker program_desc
"""
return
self
.
worker_
python/paddle/fluid/incubate/fleet/parameter_server/optimizer_factory.py
0 → 100644
浏览文件 @
3641a78b
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__all__
=
[
"DistributedAdam"
]
import
ps_pb2
as
pslib
from
paddle.fluid.distribute_lookup_table
import
find_distributed_lookup_table
from
paddle.fluid.distribute_lookup_table
import
find_distributed_lookup_table_inputs
from
paddle.fluid.distribute_lookup_table
import
find_distributed_lookup_table_outputs
from
google.protobuf
import
text_format
class
DistributedOptimizerImplBase
(
object
):
def
__init__
(
self
):
pass
def
minimize
(
self
,
optimizer
,
losses
,
startup_program
=
None
,
parameter_list
=
None
,
no_grad_set
=
None
):
pass
class
DistributedAdam
(
DistributedOptimizerImplBase
):
def
__init__
(
self
):
# todo(guru4elephant): add more optimizers here as argument
# todo(guru4elephant): make learning_rate as a variable
self
.
learning_rate_
=
learning_rate
self
.
window_
=
window
self
.
type
=
"downpour"
self
.
data_norm_name
=
[
".batch_size"
,
".batch_square_sum"
,
".batch_sum"
,
".batch_size@GRAD"
,
".batch_square_sum@GRAD"
,
".batch_sum@GRAD"
]
def
minimize
(
self
,
optimizer
,
loss
,
startup_program
=
None
,
parameter_list
=
None
,
no_grad_set
=
None
):
"""
DownpounSGD is a distributed optimizer so
that user can call minimize to generate backward
operators and optimization operators within minmize function
Args:
loss(Variable): loss variable defined by user
startup_program(Program): startup program that defined by user
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
Returns:
[optimize_ops, grads_and_weights]
"""
if
not
isinstance
(
loss
,
list
):
loss
=
[
loss
]
table_name
=
find_distributed_lookup_table
(
losses
[
0
].
block
.
program
)
prefetch_slots
=
find_distributed_lookup_table_inputs
(
losses
[
0
].
block
.
program
,
table_name
)
prefetch_slots_emb
=
find_distributed_lookup_table_outputs
(
losses
[
0
].
block
.
program
,
table_name
)
ps_param
=
pslib
.
PSParameter
()
server
=
DownpourServer
()
worker
=
DownpourWorker
(
self
.
window_
)
sparse_table_index
=
0
server
.
add_sparse_table
(
sparse_table_index
,
self
.
learning_rate_
,
prefetch_slots
,
prefetch_slots_emb
)
worker
.
add_sparse_table
(
sparse_table_index
,
self
.
learning_rate_
,
prefetch_slots
,
prefetch_slots_emb
)
dense_table_index
=
1
program_configs
=
[]
param_grads_list
=
[]
for
loss_index
in
range
(
len
(
losses
)):
program_config
=
ps_param
.
trainer_param
.
program_config
.
add
()
program_config
.
program_id
=
str
(
id
(
losses
[
loss_index
].
block
.
program
))
program_config
.
pull_sparse_table_id
.
extend
([
sparse_table_index
])
program_config
.
push_sparse_table_id
.
extend
([
sparse_table_index
])
params_grads
=
sorted
(
append_backward
(
losses
[
loss_index
],
parameter_list
,
no_grad_set
),
key
=
lambda
x
:
x
[
0
].
name
)
param_grads_list
.
append
(
params_grads
)
params
=
[]
grads
=
[]
data_norm_params
=
[]
data_norm_grads
=
[]
for
i
in
params_grads
:
is_data_norm_data
=
False
for
data_norm_name
in
self
.
data_norm_name
:
if
i
[
0
].
name
.
endswith
(
data_norm_name
):
is_data_norm_data
=
True
data_norm_params
.
append
(
i
[
0
])
if
not
is_data_norm_data
:
params
.
append
(
i
[
0
])
for
i
in
params_grads
:
is_data_norm_data
=
False
for
data_norm_grad
in
self
.
data_norm_name
:
if
i
[
0
].
name
.
endswith
(
data_norm_grad
):
is_data_norm_data
=
True
data_norm_grads
.
append
(
i
[
1
])
if
not
is_data_norm_data
:
grads
.
append
(
i
[
1
])
server
.
add_dense_table
(
dense_table_index
,
self
.
learning_rate_
,
params
,
grads
)
worker
.
add_dense_table
(
dense_table_index
,
self
.
learning_rate_
,
params
,
grads
)
program_config
.
pull_dense_table_id
.
extend
([
dense_table_index
])
program_config
.
push_dense_table_id
.
extend
([
dense_table_index
])
if
len
(
data_norm_params
)
!=
0
and
len
(
data_norm_grads
)
!=
0
:
dense_table_index
+=
1
server
.
add_data_norm_table
(
dense_table_index
,
self
.
learning_rate_
,
data_norm_params
,
data_norm_grads
)
worker
.
add_dense_table
(
dense_table_index
,
self
.
learning_rate_
,
data_norm_params
,
data_norm_grads
)
program_config
.
pull_dense_table_id
.
extend
([
dense_table_index
])
program_config
.
push_dense_table_id
.
extend
([
dense_table_index
])
dense_table_index
+=
1
program_configs
.
append
(
program_config
)
ps_param
.
server_param
.
CopyFrom
(
server
.
get_desc
())
ps_param
.
trainer_param
.
CopyFrom
(
worker
.
get_desc
())
for
program_config
in
program_configs
:
ps_param
.
trainer_param
.
program_config
.
extend
([
program_config
])
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops
=
[
"lookup_table"
,
"lookup_table_grad"
]
ps_param
.
trainer_param
.
skip_op
.
extend
(
worker_skipped_ops
)
opt_info
=
{}
opt_info
[
"trainer"
]
=
"DistMultiTrainer"
opt_info
[
"device_worker"
]
=
"DownpourSGD"
opt_info
[
"optimizer"
]
=
"DownpourSGD"
opt_info
[
"fleet_desc"
]
=
ps_param
opt_info
[
"worker_skipped_ops"
]
=
worker_skipped_ops
for
loss
in
losses
:
loss
.
block
.
program
.
_fleet_opt
=
opt_info
return
None
,
param_grads_list
[
0
],
opt_info
python/paddle/fluid/incubate/fleet/parameter_server/ps_pb2.py
0 → 100644
浏览文件 @
3641a78b
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录