Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Crayon鑫
Paddle
提交
51e2933d
P
Paddle
项目概览
Crayon鑫
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
51e2933d
编写于
7月 12, 2022
作者:
Q
qipengh
提交者:
GitHub
7月 12, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[MLU]add sync_batch_norm op 2/2 (#44178)
上级
f1be9cf1
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
737 addition
and
14 deletion
+737
-14
paddle/fluid/operators/scale_op_mlu.cc
paddle/fluid/operators/scale_op_mlu.cc
+1
-1
paddle/fluid/platform/device/mlu/device_context.cc
paddle/fluid/platform/device/mlu/device_context.cc
+10
-13
python/paddle/fluid/tests/unittests/mlu/test_sync_batch_norm_base_mlu.py
...luid/tests/unittests/mlu/test_sync_batch_norm_base_mlu.py
+506
-0
python/paddle/fluid/tests/unittests/mlu/test_sync_batch_norm_op_mlu_baseline.py
...sts/unittests/mlu/test_sync_batch_norm_op_mlu_baseline.py
+43
-0
python/paddle/fluid/tests/unittests/mlu/test_sync_batch_norm_op_mlu_extra.py
.../tests/unittests/mlu/test_sync_batch_norm_op_mlu_extra.py
+177
-0
未找到文件。
paddle/fluid/operators/scale_op_mlu.cc
浏览文件 @
51e2933d
...
...
@@ -21,7 +21,7 @@ namespace operators {
template
<
typename
T
>
class
ScaleMLUKernel
:
public
framework
::
OpKernel
<
T
>
{
public:
v
irtual
v
oid
Compute
(
const
framework
::
ExecutionContext
&
ctx
)
const
{
void
Compute
(
const
framework
::
ExecutionContext
&
ctx
)
const
{
auto
&
dev_ctx
=
GetDevCtxFromCTX
(
ctx
);
auto
*
in_var
=
ctx
.
InputVar
(
"X"
);
auto
*
in
=
framework
::
GetLoDTensorOrSelectedRowsValueFromVar
(
*
in_var
);
...
...
paddle/fluid/platform/device/mlu/device_context.cc
浏览文件 @
51e2933d
...
...
@@ -42,19 +42,16 @@ MLUDeviceContext::MLUDeviceContext(MLUPlace place) : place_(place) {
runtime_version_
=
GetMLURuntimeVersion
(
place_
.
device
);
cnnl_version_
=
GetMLUCnnlVersion
(
place_
.
device
);
LOG_FIRST_N
(
WARNING
,
1
)
<<
"Please NOTE: device: "
<<
place_
.
device
<<
", MLU Compute Capability: "
<<
compute_capability_
/
10
<<
"."
<<
compute_capability_
%
10
<<
", Driver API Version: "
<<
driver_version_
/
10000
<<
"."
<<
(
driver_version_
/
100
)
%
100
<<
"."
<<
driver_version_
%
100
<<
", Runtime API Version: "
<<
runtime_version_
/
10000
<<
"."
<<
(
runtime_version_
/
100
)
%
100
<<
"."
<<
runtime_version_
%
100
<<
", Cnnl API Version: "
<<
cnnl_version_
/
10000
<<
"."
<<
(
cnnl_version_
/
100
)
%
100
<<
"."
<<
cnnl_version_
%
100
;
LOG_FIRST_N
(
WARNING
,
1
)
<<
"Please NOTE: device: "
<<
static_cast
<
int
>
(
place_
.
device
)
<<
", MLU Compute Capability: "
<<
compute_capability_
/
10
<<
"."
<<
compute_capability_
%
10
<<
", Driver API Version: "
<<
driver_version_
/
10000
<<
"."
<<
(
driver_version_
/
100
)
%
100
<<
"."
<<
driver_version_
%
100
<<
", Runtime API Version: "
<<
runtime_version_
/
10000
<<
"."
<<
(
runtime_version_
/
100
)
%
100
<<
"."
<<
runtime_version_
%
100
<<
", Cnnl API Version: "
<<
cnnl_version_
/
10000
<<
"."
<<
(
cnnl_version_
/
100
)
%
100
<<
"."
<<
cnnl_version_
%
100
;
default_ctx_
.
reset
(
new
MLUContext
(
place_
));
}
...
...
python/paddle/fluid/tests/unittests/mlu/test_sync_batch_norm_base_mlu.py
0 → 100644
浏览文件 @
51e2933d
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
numpy
as
np
import
unittest
import
time
import
argparse
import
os
import
six
import
sys
sys
.
path
.
append
(
".."
)
import
subprocess
import
traceback
import
functools
import
pickle
from
contextlib
import
closing
import
paddle.fluid
as
fluid
import
paddle.fluid.unique_name
as
nameGen
from
paddle.fluid
import
core
from
six
import
string_types
import
paddle
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
_set_use_system_allocator
from
paddle.fluid.tests.unittests.test_sync_batch_norm_op
import
create_or_get_tensor
_set_use_system_allocator
(
False
)
paddle
.
enable_static
()
SEED
=
10
class
TestSyncBatchNormRunnerBase
(
object
):
def
get_model
(
self
,
main
,
startup
,
place
,
layout
,
seed
,
sync_bn
=
False
,
only_forward
=
False
):
raise
NotImplementedError
(
"get model should be implemented by child class."
)
def
wait_server_ready
(
self
,
endpoints
):
assert
not
isinstance
(
endpoints
,
string_types
)
while
True
:
all_ok
=
True
not_ready_endpoints
=
[]
for
ep
in
endpoints
:
ip_port
=
ep
.
split
(
":"
)
with
closing
(
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
))
as
sock
:
sock
.
settimeout
(
2
)
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
if
hasattr
(
socket
,
'SO_REUSEPORT'
):
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEPORT
,
1
)
result
=
sock
.
connect_ex
((
ip_port
[
0
],
int
(
ip_port
[
1
])))
if
result
!=
0
:
all_ok
=
False
not_ready_endpoints
.
append
(
ep
)
if
not
all_ok
:
sys
.
stderr
.
write
(
"server not ready, wait 3 sec to retry...
\n
"
)
sys
.
stderr
.
write
(
"not ready endpoints:"
+
str
(
not_ready_endpoints
)
+
"
\n
"
)
sys
.
stderr
.
flush
()
time
.
sleep
(
3
)
else
:
break
def
initCommunicator
(
self
,
program
,
rank
,
nranks
,
wait_port
,
current_endpoint
,
endpoints
):
other_endpoints
=
endpoints
[:]
other_endpoints
.
remove
(
current_endpoint
)
if
rank
==
0
and
wait_port
:
self
.
wait_server_ready
(
other_endpoints
)
block
=
program
.
global_block
()
cncl_id_var
=
block
.
create_var
(
name
=
nameGen
.
generate
(
'cncl_id'
),
persistable
=
True
,
type
=
core
.
VarDesc
.
VarType
.
RAW
)
block
.
append_op
(
type
=
'c_gen_cncl_id'
,
inputs
=
{},
outputs
=
{
'Out'
:
cncl_id_var
},
attrs
=
{
'rank'
:
rank
,
'endpoint'
:
current_endpoint
,
'other_endpoints'
:
other_endpoints
})
block
.
append_op
(
type
=
'c_comm_init'
,
inputs
=
{
'X'
:
cncl_id_var
},
outputs
=
{},
attrs
=
{
'nranks'
:
nranks
,
'rank'
:
rank
,
'ring_id'
:
self
.
global_ring_id
})
def
run_trainer
(
self
,
args
):
device_id
=
int
(
os
.
getenv
(
"FLAGS_selected_mlus"
,
"0"
))
place
=
fluid
.
MLUPlace
(
device_id
)
places
=
[
place
]
# Test training
for
place
in
places
:
for
layout
in
[
"NCHW"
,
"NHWC"
]:
self
.
_compare
(
args
,
place
,
layout
,
False
)
# Test inference
for
place
in
places
:
for
layout
in
[
"NCHW"
,
"NHWC"
]:
self
.
_compare
(
args
,
place
,
layout
,
True
)
# # Test FP16 - @TODO
# self.dtype = np.float16
# self.atol = 1e-2
# # Test training
# for place in places:
# for layout in ["NCHW", "NHWC"]:
# self._compare(args, place, layout, False)
# # Test inference
# for place in places:
# for layout in ["NCHW", "NHWC"]:
# self._compare(args, place, layout, True)
sys
.
stdout
.
buffer
.
write
(
pickle
.
dumps
(
'training, inference, fp32, fp16, NCHW, NHWC all passed'
))
def
_compare
(
self
,
args
,
place
,
layout
,
only_forward
):
scope
=
core
.
Scope
()
np
.
random
.
seed
(
SEED
)
data
=
np
.
random
.
random
(
size
=
self
.
dshape
).
astype
(
self
.
dtype
)
*
4.
-
2
sys
.
stderr
.
write
(
"data: "
+
str
(
data
)
+
"
\n
"
)
data
=
create_or_get_tensor
(
scope
,
"input"
,
OpTest
.
np_dtype_to_fluid_dtype
(
data
),
place
)
bn_fetches
=
self
.
_cal_single_card
(
args
,
data
,
place
,
layout
,
only_forward
)
fetch_names
,
sync_bn_fetches
=
self
.
_cal_multiple_cards
(
args
,
data
,
place
,
layout
,
only_forward
)
sys
.
stderr
.
write
(
"len(sync_bn_fetches): "
+
str
(
len
(
sync_bn_fetches
))
+
"
\n
"
)
for
i
in
six
.
moves
.
xrange
(
0
,
len
(
sync_bn_fetches
)):
sys
.
stderr
.
write
(
"i: "
+
str
(
i
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"fetch_names[i]): "
+
fetch_names
[
i
]
+
"
\n
"
)
bn_val
=
bn_fetches
[
i
]
sync_bn_val
=
sync_bn_fetches
[
i
]
if
sync_bn_val
.
shape
!=
bn_val
.
shape
:
sync_bn_val
=
sync_bn_val
[:
bn_val
.
shape
[
0
]]
# i = 0
if
fetch_names
[
i
]
==
'reduce_sum_0.tmp_0'
:
# sys.stderr.write("skip reduce_sum_0.tmp_0 (Out of reduce_sum op)" + "\n")
sys
.
stderr
.
write
(
"reduce_sum_0.tmp_0 (Out of reduce_sum op)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 1
if
fetch_names
[
i
]
==
'conv2d_0.tmp_0'
:
# sys.stderr.write("skip conv2d_0.tmp_0 (X)" + "\n")
sys
.
stderr
.
write
(
"conv2d_0.tmp_0 (X)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 2
if
fetch_names
[
i
]
==
'batch_norm_0.tmp_3'
:
# sys.stderr.write("skip batch_norm_0.tmp_3 (Y)" + "\n")
sys
.
stderr
.
write
(
"batch_norm_0.tmp_3 (Y)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 2
if
fetch_names
[
i
]
==
'batch_norm_0.tmp_2'
:
# sys.stderr.write("skip batch_norm_0.tmp_2 (ReserveSpace of batch_norm)" + "\n")
sys
.
stderr
.
write
(
"batch_norm_0.tmp_2 (ReserveSpace of batch_norm)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 3
if
fetch_names
[
i
]
==
'bn_moving_mean'
:
sys
.
stderr
.
write
(
"skip bn_moving_mean (MeanOut)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
continue
# i = 4
if
fetch_names
[
i
]
==
'bn_moving_variance'
:
sys
.
stderr
.
write
(
"skip bn_moving_variance (VarianceOut)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
continue
# i = 7
if
fetch_names
[
i
]
==
'batch_norm_0.tmp_0'
:
# sys.stderr.write("skip batch_norm_0.tmp_0 (SavedMean)" + "\n")
sys
.
stderr
.
write
(
"batch_norm_0.tmp_0 (SavedMean)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 8
if
fetch_names
[
i
]
==
'batch_norm_0.tmp_1'
:
sys
.
stderr
.
write
(
"skip batch_norm_0.tmp_1 (SavedVariance)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
continue
# i = 9
if
fetch_names
[
i
]
==
'bn_scale@GRAD'
:
# sys.stderr.write("skip bn_scale@GRAD (Scale@GRAD)" + "\n")
sys
.
stderr
.
write
(
"bn_scale@GRAD (Scale@GRAD)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 10
if
fetch_names
[
i
]
==
'bn_bias@GRAD'
:
# sys.stderr.write("skip bn_bias@GRAD (Bias@GRAD)" + "\n")
sys
.
stderr
.
write
(
"bn_bias@GRAD (Bias@GRAD)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 11
if
fetch_names
[
i
]
==
'batch_norm_0.tmp_3@GRAD'
:
# sys.stderr.write("skip batch_norm_0.tmp_3@GRAD (Y@GRAD)" + "\n")
sys
.
stderr
.
write
(
"batch_norm_0.tmp_3@GRAD (Y@GRAD)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
# i = 12
if
fetch_names
[
i
]
==
'conv2d_0.tmp_0@GRAD'
:
# sys.stderr.write("skip conv2d_0.tmp_0@GRAD (X@GRAD)" + "\n")
sys
.
stderr
.
write
(
"conv2d_0.tmp_0@GRAD (X@GRAD)"
+
"
\n
"
)
sys
.
stderr
.
write
(
"bn_val: "
+
str
(
bn_val
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"sync_bn_val: "
+
str
(
sync_bn_val
)
+
"
\n
"
)
# continue
atol
=
self
.
atol
if
fetch_names
[
i
]
==
'conv2d_0.tmp_0@GRAD'
:
atol
=
1e-2
assert
np
.
allclose
(
bn_val
,
sync_bn_val
,
atol
=
atol
),
"Output ("
+
fetch_names
[
i
]
+
") has diff.
\n
"
+
"
\n
BN "
+
str
(
bn_val
)
+
"
\n
"
+
"Sync BN "
+
str
(
sync_bn_val
)
def
_cal_single_card
(
self
,
args
,
data
,
place
,
layout
,
only_forward
):
# Single-MLU, N = 32 per MLU
train_prog
=
fluid
.
Program
()
startup_prog
=
fluid
.
Program
()
train_prog
.
global_seed
(
SEED
)
startup_prog
.
global_seed
(
SEED
)
paddle
.
seed
(
SEED
)
outs
=
self
.
get_model
(
train_prog
,
startup_prog
,
place
,
layout
,
SEED
,
False
,
only_forward
)
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
startup_prog
)
fetch_names
=
[
v
.
name
for
v
in
outs
]
+
[
'bn_moving_mean'
,
'bn_moving_variance'
,
'bn_scale'
,
'bn_bias'
]
if
not
only_forward
:
others
=
[
'batch_norm_0.tmp_0'
,
'batch_norm_0.tmp_1'
,
'bn_scale@GRAD'
,
'bn_bias@GRAD'
,
'batch_norm_0.tmp_3@GRAD'
,
'conv2d_0.tmp_0@GRAD'
]
fetch_names
+=
others
bn_fetches
=
exe
.
run
(
program
=
train_prog
,
feed
=
{
'input'
:
data
},
fetch_list
=
fetch_names
)
return
bn_fetches
def
_cal_multiple_cards
(
self
,
args
,
data
,
place
,
layout
,
only_forward
):
# Multi-MLUs, self.N per MLU
assert
core
.
get_mlu_device_count
()
>
1
train_prog
=
fluid
.
Program
()
startup_prog
=
fluid
.
Program
()
train_prog
.
global_seed
(
SEED
)
startup_prog
.
global_seed
(
SEED
)
paddle
.
seed
(
SEED
)
sys
.
stderr
.
write
(
"train_prog: "
+
train_prog
.
to_string
(
True
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"startup_prog: "
+
startup_prog
.
to_string
(
True
)
+
"
\n
"
)
endpoints
=
args
[
"endpoints"
].
split
(
","
)
rank
=
args
[
"trainerid"
]
current_endpoint
=
args
[
"currentendpoint"
]
nranks
=
2
self
.
initCommunicator
(
startup_prog
,
rank
,
nranks
,
True
,
current_endpoint
,
endpoints
)
sys
.
stderr
.
write
(
"after init, startup_prog: "
+
startup_prog
.
to_string
(
True
)
+
"
\n
"
)
train_prog
.
global_seed
(
SEED
)
train_prog
.
_sync_with_cpp
()
startup_prog
.
global_seed
(
SEED
)
startup_prog
.
_sync_with_cpp
()
paddle
.
seed
(
SEED
)
self
.
rank
=
rank
outs
=
self
.
get_model
(
train_prog
,
startup_prog
,
place
,
layout
,
SEED
,
True
,
only_forward
)
sys
.
stderr
.
write
(
"after get_model, train_prog: "
+
train_prog
.
to_string
(
True
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"after get_model, startup_prog: "
+
startup_prog
.
to_string
(
True
)
+
"
\n
"
)
ops
=
train_prog
.
blocks
[
0
].
ops
for
i
,
op
in
enumerate
(
ops
):
if
op
.
type
==
'batch_norm'
:
sys
.
stderr
.
write
(
"i: "
+
str
(
i
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"op type: "
+
op
.
type
+
"
\n
"
)
op
.
desc
.
set_type
(
'sync_batch_norm'
)
if
op
.
type
==
'batch_norm_grad'
:
sys
.
stderr
.
write
(
"i: "
+
str
(
i
)
+
"
\n
"
)
sys
.
stderr
.
write
(
"op type: "
+
op
.
type
+
"
\n
"
)
op
.
desc
.
set_type
(
'sync_batch_norm_grad'
)
sys
.
stderr
.
write
(
"after update sync_batch_norm, train_prog: "
+
train_prog
.
to_string
(
True
)
+
"
\n
"
)
exe
=
fluid
.
Executor
(
place
)
exe
.
run
(
startup_prog
)
fetch_names
=
[
v
.
name
for
v
in
outs
]
+
[
'bn_moving_mean'
,
'bn_moving_variance'
,
'bn_scale'
,
'bn_bias'
]
if
not
only_forward
:
others
=
[
'batch_norm_0.tmp_0'
,
'batch_norm_0.tmp_1'
,
'bn_scale@GRAD'
,
'bn_bias@GRAD'
,
'batch_norm_0.tmp_3@GRAD'
,
'conv2d_0.tmp_0@GRAD'
]
fetch_names
+=
others
sync_bn_fetches
=
exe
.
run
(
program
=
train_prog
,
feed
=
{
'input'
:
data
},
fetch_list
=
fetch_names
)
return
fetch_names
,
sync_bn_fetches
def
runtime_main
(
test_class
,
col_type
,
sub_type
):
args
=
{}
model
=
test_class
()
args
[
"deviceid"
]
=
os
.
getenv
(
"FLAGS_selected_mlus"
)
args
[
"trainerid"
]
=
int
(
os
.
getenv
(
"PADDLE_TRAINER_ID"
))
args
[
"trainernum"
]
=
int
(
os
.
getenv
(
"PADDLE_TRAINERS_NUM"
))
args
[
"endpoints"
]
=
os
.
getenv
(
'PADDLE_TRAINER_ENDPOINTS'
)
args
[
"currentendpoint"
]
=
os
.
getenv
(
"PADDLE_CURRENT_ENDPOINT"
)
args
[
"col_type"
]
=
col_type
model
.
run_trainer
(
args
)
import
paddle.compat
as
cpt
import
socket
from
contextlib
import
closing
class
TestDistBase
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
_port_set
=
set
()
self
.
_trainers
=
2
self
.
_ps_endpoints
=
"127.0.0.1:%s,127.0.0.1:%s"
%
(
self
.
_find_free_port
(),
self
.
_find_free_port
())
self
.
_python_interp
=
sys
.
executable
def
_find_free_port
(
self
):
def
__free_port
():
with
closing
(
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
))
as
s
:
s
.
bind
((
''
,
0
))
return
s
.
getsockname
()[
1
]
while
True
:
port
=
__free_port
()
if
port
not
in
self
.
_port_set
:
self
.
_port_set
.
add
(
port
)
return
port
def
_run_cluster
(
self
,
model_file
,
envs
):
worker_endpoints
=
self
.
_ps_endpoints
.
split
(
","
)
w0_ep
,
w1_ep
=
worker_endpoints
# print("w0_ep:", w0_ep, " w1_ep:", w1_ep)
env0
=
{
"FLAGS_selected_mlus"
:
"0"
,
"PADDLE_TRAINER_ID"
:
"0"
,
"PADDLE_TRAINERS_NUM"
:
"2"
,
"PADDLE_TRAINER_ENDPOINTS"
:
self
.
_ps_endpoints
,
"PADDLE_CURRENT_ENDPOINT"
:
w0_ep
,
}
env1
=
{
"FLAGS_selected_mlus"
:
"1"
,
"PADDLE_TRAINER_ID"
:
"1"
,
"PADDLE_TRAINERS_NUM"
:
"2"
,
"PADDLE_TRAINER_ENDPOINTS"
:
self
.
_ps_endpoints
,
"PADDLE_CURRENT_ENDPOINT"
:
w1_ep
,
}
#update environment
env0
.
update
(
envs
)
env1
.
update
(
envs
)
tr_cmd
=
"%s %s"
tr0_cmd
=
tr_cmd
%
(
self
.
_python_interp
,
model_file
)
tr1_cmd
=
tr_cmd
%
(
self
.
_python_interp
,
model_file
)
tr0_pipe
=
open
(
"/tmp/tr0_err_%d.log"
%
os
.
getpid
(),
"w"
)
tr1_pipe
=
open
(
"/tmp/tr1_err_%d.log"
%
os
.
getpid
(),
"w"
)
print
(
"tr0_cmd: {}, env: {}
\n
"
.
format
(
tr0_cmd
,
env0
))
print
(
"tr1_cmd: {}, env: {}
\n
"
.
format
(
tr1_cmd
,
env1
))
tr0_proc
=
subprocess
.
Popen
(
tr0_cmd
.
strip
().
split
(),
stdout
=
subprocess
.
PIPE
,
stderr
=
tr0_pipe
,
env
=
env0
)
tr1_proc
=
subprocess
.
Popen
(
tr0_cmd
.
strip
().
split
(),
stdout
=
subprocess
.
PIPE
,
stderr
=
tr1_pipe
,
env
=
env1
)
tr0_out
,
tr0_err
=
tr0_proc
.
communicate
()
tr1_out
,
tr1_err
=
tr1_proc
.
communicate
()
sys
.
stderr
.
write
(
'trainer 0 stderr: %s
\n
'
%
tr0_err
)
sys
.
stderr
.
write
(
'trainer 1 stderr: %s
\n
'
%
tr1_err
)
# close trainer file
tr0_pipe
.
close
()
tr1_pipe
.
close
()
with
open
(
"/tmp/tr0_err_%d.log"
%
os
.
getpid
(),
"r"
)
as
f
:
sys
.
stderr
.
write
(
'trainer 0 stderr file: %s
\n
'
%
f
.
read
())
with
open
(
"/tmp/tr1_err_%d.log"
%
os
.
getpid
(),
"r"
)
as
f
:
sys
.
stderr
.
write
(
'trainer 1 stderr file: %s
\n
'
%
f
.
read
())
return
pickle
.
loads
(
tr0_out
),
pickle
.
loads
(
tr1_out
),
tr0_proc
.
pid
,
tr1_proc
.
pid
def
check_with_place
(
self
,
model_file
,
col_type
,
check_error_log
=
False
,
need_envs
=
{}):
required_envs
=
{
"FLAGS_fraction_of_gpu_memory_to_use"
:
"0.15"
,
"FLAGS_eager_delete_tensor_gb"
:
"0.0"
,
"PATH"
:
os
.
getenv
(
"PATH"
),
"PYTHONPATH"
:
os
.
getenv
(
"PYTHONPATH"
,
""
),
"LD_LIBRARY_PATH"
:
os
.
getenv
(
"LD_LIBRARY_PATH"
,
""
),
"LD_PRELOAD"
:
os
.
getenv
(
"LD_PRELOAD"
,
""
),
"FLAGS_call_stack_level"
:
"2"
,
"GLOG_v"
:
"3"
,
"PADDLE_WITH_GLOO"
:
'0'
,
"BACKEND"
:
"cncl"
}
required_envs
.
update
(
need_envs
)
if
check_error_log
:
required_envs
[
"GLOG_v"
]
=
"3"
required_envs
[
"GLOG_logtostderr"
]
=
"1"
required_envs
[
"GLOO_LOG_LEVEL"
]
=
"TRACE"
tr0_out
,
tr1_out
,
pid0
,
pid1
=
self
.
_run_cluster
(
model_file
,
required_envs
)
self
.
assertEqual
(
tr0_out
,
'training, inference, fp32, fp16, NCHW, NHWC all passed'
)
self
.
assertEqual
(
tr1_out
,
'training, inference, fp32, fp16, NCHW, NHWC all passed'
)
python/paddle/fluid/tests/unittests/mlu/test_sync_batch_norm_op_mlu_baseline.py
0 → 100644
浏览文件 @
51e2933d
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
paddle
import
os
import
sys
sys
.
path
.
append
(
".."
)
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
_set_use_system_allocator
from
test_sync_batch_norm_base_mlu
import
TestDistBase
_set_use_system_allocator
(
False
)
paddle
.
enable_static
()
class
TestSyncBatchNormOp
(
TestDistBase
):
def
_setup_config
(
self
):
pass
def
test_identity
(
self
,
col_type
=
"identity"
):
self
.
check_with_place
(
"sync_batch_norm_op_mlu.py"
,
col_type
,
check_error_log
=
True
)
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/mlu/test_sync_batch_norm_op_mlu_extra.py
0 → 100644
浏览文件 @
51e2933d
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
test for sync bachnorm op.
for both FP32 and FP16 input.
"""
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
os
import
sys
import
six
import
paddle
import
paddle.fluid.core
as
core
import
paddle.fluid
as
fluid
import
paddle.nn
as
nn
from
paddle.fluid
import
Program
,
program_guard
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
_set_use_system_allocator
from
paddle.fluid.tests.unittests.test_dist_base
import
TestDistBase
paddle
.
enable_static
()
class
TestDygraphSyncBatchNormAPIError
(
unittest
.
TestCase
):
def
test_errors
(
self
):
if
not
core
.
is_compiled_with_mlu
():
return
with
program_guard
(
Program
(),
Program
()):
my_sync_batch_norm
=
paddle
.
nn
.
SyncBatchNorm
(
10
)
x1
=
fluid
.
create_lod_tensor
(
np
.
array
([
-
1
,
3
,
5
,
5
]),
[[
1
,
1
,
1
,
1
]],
fluid
.
MLUPlace
(
0
))
self
.
assertRaises
(
TypeError
,
my_sync_batch_norm
,
x1
)
# the input dtype of SyncBatchNorm must be float16 or float32
x2
=
fluid
.
layers
.
data
(
name
=
'x2'
,
shape
=
[
3
,
4
,
5
,
6
],
dtype
=
"int32"
)
self
.
assertRaises
(
TypeError
,
my_sync_batch_norm
,
x2
)
class
TestConvertSyncBatchNorm
(
unittest
.
TestCase
):
def
test_convert
(
self
):
if
not
core
.
is_compiled_with_mlu
():
return
with
program_guard
(
Program
(),
Program
()):
compare_model
=
paddle
.
nn
.
Sequential
(
paddle
.
nn
.
Conv2D
(
3
,
5
,
3
),
paddle
.
nn
.
BatchNorm2D
(
5
),
paddle
.
nn
.
BatchNorm2D
(
5
))
model
=
paddle
.
nn
.
Sequential
(
paddle
.
nn
.
Conv2D
(
3
,
5
,
3
),
paddle
.
nn
.
BatchNorm2D
(
5
),
paddle
.
nn
.
BatchNorm2D
(
5
,
weight_attr
=
fluid
.
ParamAttr
(
name
=
'bn.scale'
),
bias_attr
=
fluid
.
ParamAttr
(
name
=
'bn.bias'
)))
model
=
paddle
.
nn
.
SyncBatchNorm
.
convert_sync_batchnorm
(
model
)
for
idx
,
sublayer
in
enumerate
(
compare_model
.
sublayers
()):
if
isinstance
(
sublayer
,
paddle
.
nn
.
BatchNorm2D
):
self
.
assertEqual
(
isinstance
(
model
[
idx
],
paddle
.
nn
.
SyncBatchNorm
),
True
)
class
TestConvertSyncBatchNormCast1
(
unittest
.
TestCase
):
def
test_convert
(
self
):
if
not
core
.
is_compiled_with_mlu
():
return
class
Net
(
nn
.
Layer
):
def
__init__
(
self
):
super
(
Net
,
self
).
__init__
()
self
.
conv1
=
nn
.
Conv2D
(
3
,
5
,
3
)
self
.
bn
=
[]
bn
=
self
.
add_sublayer
(
'bn'
,
nn
.
BatchNorm2D
(
5
))
self
.
bn
.
append
(
bn
)
def
forward
(
self
,
x
):
x
=
self
.
conv1
(
x
)
for
bn
in
self
.
bn
:
x
=
bn
(
x
)
return
x
model
=
nn
.
Sequential
()
model
.
add_sublayer
(
'net1'
,
Net
())
model
.
add_sublayer
(
'net2'
,
Net
())
compare_model
=
nn
.
Sequential
()
compare_model
.
add_sublayer
(
'net1'
,
Net
())
compare_model
.
add_sublayer
(
'net2'
,
Net
())
model
=
nn
.
SyncBatchNorm
.
convert_sync_batchnorm
(
model
)
self
.
assertEqual
(
len
(
compare_model
.
sublayers
()),
len
(
model
.
sublayers
()))
class
TestConvertSyncBatchNormCase2
(
unittest
.
TestCase
):
def
test_convert
(
self
):
if
not
core
.
is_compiled_with_mlu
():
return
with
fluid
.
dygraph
.
guard
(
fluid
.
MLUPlace
(
0
)):
class
SyBNNet
(
paddle
.
nn
.
Layer
):
def
__init__
(
self
,
in_ch
=
3
,
out_ch
=
3
,
dirate
=
1
):
super
(
SyBNNet
,
self
).
__init__
()
self
.
bn_s1
=
paddle
.
nn
.
SyncBatchNorm
.
convert_sync_batchnorm
(
paddle
.
nn
.
BatchNorm3D
(
out_ch
,
weight_attr
=
paddle
.
ParamAttr
(
regularizer
=
paddle
.
regularizer
.
L2Decay
(
0.
))))
self
.
bn_s2
=
paddle
.
nn
.
SyncBatchNorm
.
convert_sync_batchnorm
(
paddle
.
nn
.
BatchNorm3D
(
out_ch
,
data_format
=
'NDHWC'
))
def
forward
(
self
,
x
):
x
=
self
.
bn_s1
(
x
)
out
=
paddle
.
sum
(
paddle
.
abs
(
self
.
bn_s2
(
x
)))
return
out
class
BNNet
(
paddle
.
nn
.
Layer
):
def
__init__
(
self
,
in_ch
=
3
,
out_ch
=
3
,
dirate
=
1
):
super
(
BNNet
,
self
).
__init__
()
self
.
bn_s1
=
paddle
.
nn
.
BatchNorm3D
(
out_ch
,
weight_attr
=
paddle
.
ParamAttr
(
regularizer
=
paddle
.
regularizer
.
L2Decay
(
0.
)))
self
.
bn_s2
=
paddle
.
nn
.
SyncBatchNorm
.
convert_sync_batchnorm
(
paddle
.
nn
.
BatchNorm3D
(
out_ch
,
data_format
=
'NDHWC'
))
def
forward
(
self
,
x
):
x
=
self
.
bn_s1
(
x
)
out
=
paddle
.
sum
(
paddle
.
abs
(
self
.
bn_s2
(
x
)))
return
out
bn_model
=
BNNet
()
sybn_model
=
SyBNNet
()
np
.
random
.
seed
(
10
)
data
=
np
.
random
.
random
([
3
,
3
,
3
,
3
,
3
]).
astype
(
'float32'
)
x
=
paddle
.
to_tensor
(
data
)
bn_out
=
bn_model
(
x
)
sybn_out
=
sybn_model
(
x
)
self
.
assertTrue
(
np
.
allclose
(
bn_out
.
numpy
(),
sybn_out
.
numpy
()),
"Output has diff.
\n
"
+
"
\n
BN "
+
str
(
bn_out
.
numpy
())
+
"
\n
"
+
"Sync BN "
+
str
(
sybn_out
.
numpy
()))
class
TestDygraphSyncBatchNormDataFormatError
(
unittest
.
TestCase
):
def
test_errors
(
self
):
if
not
core
.
is_compiled_with_mlu
():
return
with
fluid
.
dygraph
.
guard
(
fluid
.
MLUPlace
(
0
)):
my_sync_batch_norm
=
paddle
.
nn
.
SyncBatchNorm
(
10
,
data_format
=
'CN'
)
data
=
np
.
random
.
random
([
3
,
3
,
3
]).
astype
(
'float32'
)
x
=
paddle
.
to_tensor
(
data
)
self
.
assertRaises
(
ValueError
,
my_sync_batch_norm
,
x
)
if
__name__
==
'__main__'
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录