Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
61162497
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
61162497
编写于
10月 20, 2020
作者:
D
danleifeng
提交者:
GitHub
10月 20, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
raise error if use multi-cards in fleet non_distributed mode;test=develop (#28093)
上级
d4160941
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
176 addition
and
106 deletion
+176
-106
python/paddle/distributed/fleet/base/fleet_base.py
python/paddle/distributed/fleet/base/fleet_base.py
+9
-2
python/paddle/fluid/tests/unittests/CMakeLists.txt
python/paddle/fluid/tests/unittests/CMakeLists.txt
+1
-0
python/paddle/fluid/tests/unittests/test_fleet_base.py
python/paddle/fluid/tests/unittests/test_fleet_base.py
+18
-104
python/paddle/fluid/tests/unittests/test_fleet_base_single.py
...on/paddle/fluid/tests/unittests/test_fleet_base_single.py
+148
-0
未找到文件。
python/paddle/distributed/fleet/base/fleet_base.py
浏览文件 @
61162497
...
...
@@ -186,6 +186,15 @@ class Fleet(object):
fleet
.
util
.
_set_role_maker
(
self
.
_role_maker
)
self
.
strategy_compiler
=
StrategyCompiler
()
if
self
.
_role_maker
.
_is_non_distributed
()
and
self
.
_is_collective
:
if
paddle
.
fluid
.
core
.
is_compiled_with_cuda
():
gpus_num
=
paddle
.
fluid
.
core
.
get_cuda_device_count
()
if
gpus_num
!=
1
:
raise
ValueError
(
"CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program."
)
if
paddle
.
fluid
.
framework
.
in_dygraph_mode
():
if
self
.
worker_num
()
==
1
:
return
...
...
@@ -569,8 +578,6 @@ class Fleet(object):
"""
self
.
user_defined_optimizer
=
optimizer
if
paddle
.
fluid
.
framework
.
in_dygraph_mode
():
return
self
if
strategy
==
None
:
strategy
=
DistributedStrategy
()
...
...
python/paddle/fluid/tests/unittests/CMakeLists.txt
浏览文件 @
61162497
...
...
@@ -129,6 +129,7 @@ if (NOT ${WITH_GPU})
LIST
(
REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer
)
LIST
(
REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm
)
LIST
(
REMOVE_ITEM TEST_OPS test_imperative_auto_mixed_precision
)
LIST
(
REMOVE_ITEM TEST_OPS test_fleet_base_single
)
elseif
(
${
CUDNN_VERSION
}
VERSION_LESS 7100
)
LIST
(
REMOVE_ITEM TEST_OPS test_conv2d_fusion_op
)
endif
()
...
...
python/paddle/fluid/tests/unittests/test_fleet_base.py
浏览文件 @
61162497
...
...
@@ -171,45 +171,7 @@ class TestFleetDygraph(unittest.TestCase):
final_strategy
=
fleet
.
_final_strategy
()
class
LinearNet
(
nn
.
Layer
):
def
__init__
(
self
):
super
(
LinearNet
,
self
).
__init__
()
self
.
_linear1
=
nn
.
Linear
(
10
,
10
)
self
.
_linear2
=
nn
.
Linear
(
10
,
1
)
def
forward
(
self
,
x
):
return
self
.
_linear2
(
self
.
_linear1
(
x
))
class
TestFleetDygraphSingle
(
unittest
.
TestCase
):
def
setUp
(
self
):
os
.
environ
[
"PADDLE_TRAINER_ENDPOINTS"
]
=
"127.0.0.1:36213"
os
.
environ
[
"PADDLE_CURRENT_ENDPOINTS"
]
=
"127.0.0.1:36213"
os
.
environ
[
"PADDLE_TRAINERS_NUM"
]
=
"1"
os
.
environ
[
"PADDLE_TRAINER_ID"
]
=
"0"
def
test_dygraph_single
(
self
):
paddle
.
disable_static
()
fleet
.
init
(
is_collective
=
True
)
layer
=
LinearNet
()
loss_fn
=
nn
.
MSELoss
()
adam
=
paddle
.
optimizer
.
Adam
(
learning_rate
=
0.001
,
parameters
=
layer
.
parameters
())
adam
=
fleet
.
distributed_optimizer
(
adam
)
dp_layer
=
fleet
.
distributed_model
(
layer
)
for
step
in
range
(
2
):
inputs
=
paddle
.
randn
([
10
,
10
],
'float32'
)
outputs
=
dp_layer
(
inputs
)
labels
=
paddle
.
randn
([
10
,
1
],
'float32'
)
loss
=
loss_fn
(
outputs
,
labels
)
loss
.
backward
()
adam
.
step
()
adam
.
clear_grad
()
class
TestFleetBaseSingleRunCollective
(
unittest
.
TestCase
):
class
TestFleetBaseSingleError
(
unittest
.
TestCase
):
def
setUp
(
self
):
os
.
environ
.
pop
(
"PADDLE_TRAINER_ENDPOINTS"
)
...
...
@@ -221,71 +183,23 @@ class TestFleetBaseSingleRunCollective(unittest.TestCase):
}
def
test_single_run_collective_minimize
(
self
):
input_x
=
paddle
.
static
.
data
(
name
=
"x"
,
shape
=
[
-
1
,
32
],
dtype
=
'float32'
)
input_y
=
paddle
.
static
.
data
(
name
=
"y"
,
shape
=
[
-
1
,
1
],
dtype
=
'int64'
)
fc_1
=
fluid
.
layers
.
fc
(
input
=
input_x
,
size
=
64
,
act
=
'tanh'
)
prediction
=
fluid
.
layers
.
fc
(
input
=
fc_1
,
size
=
2
,
act
=
'softmax'
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
prediction
,
label
=
input_y
)
avg_cost
=
paddle
.
mean
(
x
=
cost
)
fleet
.
init
(
is_collective
=
True
)
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.001
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
)
optimizer
.
minimize
(
avg_cost
)
place
=
fluid
.
CUDAPlace
(
0
)
if
paddle
.
fluid
.
is_compiled_with_cuda
(
)
else
fluid
.
CPUPlace
()
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
paddle
.
static
.
default_startup_program
())
for
i
in
range
(
10
):
cost_val
=
exe
.
run
(
feed
=
self
.
gen_data
(),
fetch_list
=
[
avg_cost
.
name
])
print
(
"cost of step[{}] = {}"
.
format
(
i
,
cost_val
))
class
TestFleetBaseSingleRunPS
(
unittest
.
TestCase
):
def
setUp
(
self
):
os
.
environ
.
pop
(
"PADDLE_PSERVERS_IP_PORT_LIST"
)
def
gen_data
(
self
):
return
{
"x"
:
np
.
random
.
random
(
size
=
(
128
,
32
)).
astype
(
'float32'
),
"y"
:
np
.
random
.
randint
(
2
,
size
=
(
128
,
1
)).
astype
(
'int64'
)
}
def
test_single_run_ps_minimize
(
self
):
input_x
=
paddle
.
static
.
data
(
name
=
"x"
,
shape
=
[
-
1
,
32
],
dtype
=
'float32'
)
input_y
=
paddle
.
static
.
data
(
name
=
"y"
,
shape
=
[
-
1
,
1
],
dtype
=
'int64'
)
fc_1
=
fluid
.
layers
.
fc
(
input
=
input_x
,
size
=
64
,
act
=
'tanh'
)
prediction
=
fluid
.
layers
.
fc
(
input
=
fc_1
,
size
=
2
,
act
=
'softmax'
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
prediction
,
label
=
input_y
)
avg_cost
=
paddle
.
mean
(
x
=
cost
)
fleet
.
init
()
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.01
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
=
strategy
)
optimizer
.
minimize
(
avg_cost
)
if
fleet
.
is_server
():
fleet
.
init_server
()
fleet
.
run_server
()
elif
fleet
.
is_worker
():
place
=
fluid
.
CPUPlace
()
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
paddle
.
static
.
default_startup_program
())
step
=
100
for
i
in
range
(
step
):
cost_val
=
exe
.
run
(
program
=
fluid
.
default_main_program
(),
feed
=
self
.
gen_data
(),
fetch_list
=
[
avg_cost
.
name
])
print
(
"worker_index: %d, step%d cost = %f"
%
(
fleet
.
worker_index
(),
i
,
cost_val
[
0
]))
fleet
.
save_persistables
(
exe
,
"fleet_single_model/"
)
print
(
"save fleet models done."
)
def
test_single_error
():
input_x
=
paddle
.
static
.
data
(
name
=
"x"
,
shape
=
[
-
1
,
32
],
dtype
=
'float32'
)
input_y
=
paddle
.
static
.
data
(
name
=
"y"
,
shape
=
[
-
1
,
1
],
dtype
=
'int64'
)
fc_1
=
fluid
.
layers
.
fc
(
input
=
input_x
,
size
=
64
,
act
=
'tanh'
)
prediction
=
fluid
.
layers
.
fc
(
input
=
fc_1
,
size
=
2
,
act
=
'softmax'
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
prediction
,
label
=
input_y
)
avg_cost
=
paddle
.
mean
(
x
=
cost
)
fleet
.
init
(
is_collective
=
True
)
# in non_distributed mode(use `python` to launch), raise error if has multi cards
if
fluid
.
core
.
is_compiled_with_cuda
(
)
and
fluid
.
core
.
get_cuda_device_count
()
>
1
:
self
.
assertRaises
(
ValueError
,
test_single_error
)
else
:
test_single_error
()
if
__name__
==
"__main__"
:
...
...
python/paddle/fluid/tests/unittests/test_fleet_base_single.py
0 → 100644
浏览文件 @
61162497
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
numpy
as
np
import
os
cuda_visible_devices
=
os
.
getenv
(
'CUDA_VISIBLE_DEVICES'
)
if
cuda_visible_devices
is
None
or
cuda_visible_devices
==
""
:
os
.
environ
[
'CUDA_VISIBLE_DEVICES'
]
=
'0'
else
:
os
.
environ
[
'CUDA_VISIBLE_DEVICES'
]
=
cuda_visible_devices
.
split
(
','
)[
0
]
import
paddle
import
paddle.distributed.fleet
as
fleet
import
paddle.distributed.fleet.base.role_maker
as
role_maker
import
paddle.fluid
as
fluid
import
unittest
import
paddle.nn
as
nn
class
LinearNet
(
nn
.
Layer
):
def
__init__
(
self
):
super
(
LinearNet
,
self
).
__init__
()
self
.
_linear1
=
nn
.
Linear
(
10
,
10
)
self
.
_linear2
=
nn
.
Linear
(
10
,
1
)
def
forward
(
self
,
x
):
return
self
.
_linear2
(
self
.
_linear1
(
x
))
class
TestFleetDygraphSingle
(
unittest
.
TestCase
):
def
setUp
(
self
):
os
.
environ
[
"PADDLE_TRAINER_ENDPOINTS"
]
=
"127.0.0.1:36213"
os
.
environ
[
"PADDLE_CURRENT_ENDPOINTS"
]
=
"127.0.0.1:36213"
os
.
environ
[
"PADDLE_TRAINERS_NUM"
]
=
"1"
os
.
environ
[
"PADDLE_TRAINER_ID"
]
=
"0"
def
test_dygraph_single
(
self
):
paddle
.
disable_static
()
fleet
.
init
(
is_collective
=
True
)
layer
=
LinearNet
()
loss_fn
=
nn
.
MSELoss
()
adam
=
paddle
.
optimizer
.
Adam
(
learning_rate
=
0.001
,
parameters
=
layer
.
parameters
())
adam
=
fleet
.
distributed_optimizer
(
adam
)
dp_layer
=
fleet
.
distributed_model
(
layer
)
for
step
in
range
(
2
):
inputs
=
paddle
.
randn
([
10
,
10
],
'float32'
)
outputs
=
dp_layer
(
inputs
)
labels
=
paddle
.
randn
([
10
,
1
],
'float32'
)
loss
=
loss_fn
(
outputs
,
labels
)
loss
=
dp_layer
.
scale_loss
(
loss
)
loss
.
backward
()
dp_layer
.
apply_collective_grads
()
adam
.
step
()
adam
.
clear_grad
()
class
TestFleetBaseSingleRunCollective
(
unittest
.
TestCase
):
def
setUp
(
self
):
pass
def
gen_data
(
self
):
return
{
"x"
:
np
.
random
.
random
(
size
=
(
128
,
32
)).
astype
(
'float32'
),
"y"
:
np
.
random
.
randint
(
2
,
size
=
(
128
,
1
)).
astype
(
'int64'
)
}
def
test_single_run_collective_minimize
(
self
):
input_x
=
paddle
.
static
.
data
(
name
=
"x"
,
shape
=
[
-
1
,
32
],
dtype
=
'float32'
)
input_y
=
paddle
.
static
.
data
(
name
=
"y"
,
shape
=
[
-
1
,
1
],
dtype
=
'int64'
)
fc_1
=
fluid
.
layers
.
fc
(
input
=
input_x
,
size
=
64
,
act
=
'tanh'
)
prediction
=
fluid
.
layers
.
fc
(
input
=
fc_1
,
size
=
2
,
act
=
'softmax'
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
prediction
,
label
=
input_y
)
avg_cost
=
paddle
.
mean
(
x
=
cost
)
fleet
.
init
(
is_collective
=
True
)
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.001
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
)
optimizer
.
minimize
(
avg_cost
)
place
=
fluid
.
CUDAPlace
(
0
)
if
paddle
.
fluid
.
is_compiled_with_cuda
(
)
else
fluid
.
CPUPlace
()
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
paddle
.
static
.
default_startup_program
())
for
i
in
range
(
10
):
cost_val
=
exe
.
run
(
feed
=
self
.
gen_data
(),
fetch_list
=
[
avg_cost
.
name
])
print
(
"cost of step[{}] = {}"
.
format
(
i
,
cost_val
))
class
TestFleetBaseSingleRunPS
(
unittest
.
TestCase
):
def
setUp
(
self
):
pass
def
gen_data
(
self
):
return
{
"x"
:
np
.
random
.
random
(
size
=
(
128
,
32
)).
astype
(
'float32'
),
"y"
:
np
.
random
.
randint
(
2
,
size
=
(
128
,
1
)).
astype
(
'int64'
)
}
def
test_single_run_ps_minimize
(
self
):
input_x
=
paddle
.
static
.
data
(
name
=
"x"
,
shape
=
[
-
1
,
32
],
dtype
=
'float32'
)
input_y
=
paddle
.
static
.
data
(
name
=
"y"
,
shape
=
[
-
1
,
1
],
dtype
=
'int64'
)
fc_1
=
fluid
.
layers
.
fc
(
input
=
input_x
,
size
=
64
,
act
=
'tanh'
)
prediction
=
fluid
.
layers
.
fc
(
input
=
fc_1
,
size
=
2
,
act
=
'softmax'
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
prediction
,
label
=
input_y
)
avg_cost
=
paddle
.
mean
(
x
=
cost
)
fleet
.
init
()
strategy
=
paddle
.
distributed
.
fleet
.
DistributedStrategy
()
optimizer
=
fluid
.
optimizer
.
SGD
(
learning_rate
=
0.01
)
optimizer
=
fleet
.
distributed_optimizer
(
optimizer
,
strategy
=
strategy
)
optimizer
.
minimize
(
avg_cost
)
if
fleet
.
is_server
():
fleet
.
init_server
()
fleet
.
run_server
()
elif
fleet
.
is_worker
():
place
=
fluid
.
CPUPlace
()
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
paddle
.
static
.
default_startup_program
())
step
=
10
for
i
in
range
(
step
):
cost_val
=
exe
.
run
(
program
=
fluid
.
default_main_program
(),
feed
=
self
.
gen_data
(),
fetch_list
=
[
avg_cost
.
name
])
print
(
"worker_index: %d, step%d cost = %f"
%
(
fleet
.
worker_index
(),
i
,
cost_val
[
0
]))
if
__name__
==
"__main__"
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录