Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
dcdd18ae
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
dcdd18ae
编写于
2月 04, 2020
作者:
T
tangwei12
提交者:
GitHub
2月 04, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix bug with half (#22378) (#22415)
* fix bug with half communicator
上级
f0431607
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
64 addition
and
22 deletion
+64
-22
python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py
.../fleet/parameter_server/distribute_transpiler/__init__.py
+15
-7
python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py
...eter_server/distribute_transpiler/distributed_strategy.py
+14
-14
python/paddle/fluid/tests/unittests/test_distributed_strategy.py
...paddle/fluid/tests/unittests/test_distributed_strategy.py
+35
-1
未找到文件。
python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py
浏览文件 @
dcdd18ae
...
@@ -271,13 +271,21 @@ class DistributedTranspiler(Fleet):
...
@@ -271,13 +271,21 @@ class DistributedTranspiler(Fleet):
elif
isinstance
(
config
,
DistributeTranspilerConfig
):
elif
isinstance
(
config
,
DistributeTranspilerConfig
):
if
config
.
sync_mode
:
if
config
.
sync_mode
:
self
.
_transpile_config
=
SyncStrategy
()
self
.
_transpile_config
=
SyncStrategy
()
elif
config
.
geo_sgd_mode
:
self
.
_transpile_config
=
GeoStrategy
(
config
.
geo_sgd_need_push_nums
)
elif
config
.
runtime_split_send_recv
and
config
.
half_async
:
self
.
_transpile_config
=
HalfAsyncStrategy
()
else
:
else
:
self
.
_transpile_config
=
AsyncStrategy
()
if
config
.
runtime_split_send_recv
:
if
config
.
geo_sgd_mode
:
self
.
_transpile_config
=
GeoStrategy
(
config
.
geo_sgd_need_push_nums
)
elif
config
.
half_async
:
self
.
_transpile_config
=
HalfAsyncStrategy
()
else
:
self
.
_transpile_config
=
AsyncStrategy
()
else
:
self
.
_transpile_config
=
HalfAsyncStrategy
()
# for half_async compatibility
config
.
half_async
=
True
config
.
runtime_split_send_recv
=
True
self
.
_transpile_config
.
set_program_config
(
config
)
self
.
_transpile_config
.
set_program_config
(
config
)
else
:
else
:
raise
TypeError
(
raise
TypeError
(
...
@@ -359,7 +367,7 @@ class TranspilerOptimizer(DistributedOptimizer):
...
@@ -359,7 +367,7 @@ class TranspilerOptimizer(DistributedOptimizer):
"In {} mode, strategy must be an instance of DistributeTranspilerConfig, SyncStrategy, HalfAsyncStrategy, AsyncStrategy, or GeoStrategy"
.
"In {} mode, strategy must be an instance of DistributeTranspilerConfig, SyncStrategy, HalfAsyncStrategy, AsyncStrategy, or GeoStrategy"
.
format
(
fleet
.
_mode
))
format
(
fleet
.
_mode
))
else
:
else
:
self
.
_strategy
=
DistributedS
trategy
()
self
.
_strategy
=
StrategyFactory
.
create_sync_s
trategy
()
def
backward
(
self
,
def
backward
(
self
,
loss
,
loss
,
...
...
python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py
浏览文件 @
dcdd18ae
...
@@ -48,20 +48,20 @@ class TrainerRuntimeConfig(object):
...
@@ -48,20 +48,20 @@ class TrainerRuntimeConfig(object):
def
get_communicator_flags
(
self
):
def
get_communicator_flags
(
self
):
_communicator_flags
=
dict
()
_communicator_flags
=
dict
()
_communicator_flags
[
_communicator_flags
[
"communicator_max_merge_var_num"
]
=
str
(
"communicator_max_merge_var_num"
]
=
self
.
max_merge_var_num
self
.
max_merge_var_num
)
_communicator_flags
[
_communicator_flags
[
"communicator_send_queue_size"
]
=
str
(
"communicator_send_queue_size"
]
=
self
.
send_queue_size
self
.
send_queue_size
)
_communicator_flags
[
_communicator_flags
[
"communicator_independent_recv_thread"
]
=
str
(
"communicator_independent_recv_thread"
]
=
self
.
independent_recv_thread
self
.
independent_recv_thread
)
_communicator_flags
[
_communicator_flags
[
"communicator_min_send_grad_num_before_recv"
]
=
str
(
"communicator_min_send_grad_num_before_recv"
]
=
self
.
min_send_grad_num_before_recv
self
.
min_send_grad_num_before_recv
)
_communicator_flags
[
_communicator_flags
[
"communicator_thread_pool_size"
]
=
str
(
"communicator_thread_pool_size"
]
=
self
.
thread_pool_size
self
.
thread_pool_size
)
_communicator_flags
[
_communicator_flags
[
"communicator_send_wait_times"
]
=
str
(
"communicator_send_wait_times"
]
=
self
.
send_wait_times
self
.
send_wait_times
)
_communicator_flags
[
_communicator_flags
[
"communicator_is_sgd_optimizer"
]
=
str
(
"communicator_is_sgd_optimizer"
]
=
self
.
is_sgd_optimizer
self
.
is_sgd_optimizer
)
return
_communicator_flags
return
_communicator_flags
def
__repr__
(
self
):
def
__repr__
(
self
):
...
...
python/paddle/fluid/tests/unittests/test_distributed_strategy.py
浏览文件 @
dcdd18ae
...
@@ -16,6 +16,8 @@ import unittest
...
@@ -16,6 +16,8 @@ import unittest
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
from
paddle.fluid.transpiler.distribute_transpiler
import
DistributeTranspilerConfig
,
ServerRuntimeConfig
from
paddle.fluid.transpiler.distribute_transpiler
import
DistributeTranspilerConfig
,
ServerRuntimeConfig
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy
import
TrainerRuntimeConfig
,
StrategyFactory
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy
import
TrainerRuntimeConfig
,
StrategyFactory
from
paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler
import
fleet
import
paddle.fluid.incubate.fleet.base.role_maker
as
role_maker
import
os
import
os
...
@@ -105,7 +107,7 @@ class TestStrategyFactor(unittest.TestCase):
...
@@ -105,7 +107,7 @@ class TestStrategyFactor(unittest.TestCase):
self
.
assertIn
(
'communicator_send_queue_size'
,
self
.
assertIn
(
'communicator_send_queue_size'
,
trainer_communicator_flags
)
trainer_communicator_flags
)
self
.
assertEqual
(
self
.
assertEqual
(
trainer_communicator_flags
[
'communicator_send_queue_size'
],
100
)
trainer_communicator_flags
[
'communicator_send_queue_size'
],
'100'
)
# test set_trainer_runtime_config exception
# test set_trainer_runtime_config exception
trainer_runtime_config_dict
[
'unknown'
]
=
None
trainer_runtime_config_dict
[
'unknown'
]
=
None
...
@@ -166,5 +168,37 @@ class TestStrategyFactor(unittest.TestCase):
...
@@ -166,5 +168,37 @@ class TestStrategyFactor(unittest.TestCase):
server_runtime_config_illegal
)
server_runtime_config_illegal
)
class
TestCreateDefaultStrategy
(
unittest
.
TestCase
):
def
test_default_strategy
(
self
):
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
WORKER
,
worker_num
=
2
,
server_endpoints
=
[
"127.0.0.1:6001"
,
"127.0.0.1:6002"
])
fleet
.
init
(
role
)
optimizer
=
fluid
.
optimizer
.
SGD
(
0.0001
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
)
class
TestHalfAsyncStrategy
(
unittest
.
TestCase
):
def
test_half_async_strategy
(
self
):
role
=
role_maker
.
UserDefinedRoleMaker
(
current_id
=
0
,
role
=
role_maker
.
Role
.
WORKER
,
worker_num
=
2
,
server_endpoints
=
[
"127.0.0.1:6001"
,
"127.0.0.1:6002"
])
fleet
.
init
(
role
)
half_async_config
=
DistributeTranspilerConfig
()
half_async_config
.
sync_mode
=
False
half_async_config
.
geo_sgd_mode
=
False
half_async_config
.
runtime_split_send_recv
=
False
optimizer
=
fluid
.
optimizer
.
SGD
(
0.0001
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
half_async_config
)
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
unittest
.
main
()
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录