Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
c6c9c186
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c6c9c186
编写于
9月 20, 2022
作者:
N
Nyakku Shigure
提交者:
GitHub
9月 20, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[CodeStyle] remove crlf for python files (#46155)
上级
9941ec12
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
1497 addition
and
1497 deletion
+1497
-1497
python/paddle/fluid/tests/unittests/asp/test_asp_utils.py
python/paddle/fluid/tests/unittests/asp/test_asp_utils.py
+224
-224
python/paddle/fluid/tests/unittests/mkldnn/test_elementwise_sub_mkldnn_op.py
.../tests/unittests/mkldnn/test_elementwise_sub_mkldnn_op.py
+242
-242
python/paddle/fluid/tests/unittests/mkldnn/test_fill_constant_mkldnn_op.py
...id/tests/unittests/mkldnn/test_fill_constant_mkldnn_op.py
+126
-126
python/paddle/fluid/tests/unittests/sequence/test_sequence_first_step.py
...luid/tests/unittests/sequence/test_sequence_first_step.py
+51
-51
python/paddle/fluid/tests/unittests/sequence/test_sequence_last_step.py
...fluid/tests/unittests/sequence/test_sequence_last_step.py
+51
-51
python/paddle/fluid/tests/unittests/test_lstm_cudnn_op.py
python/paddle/fluid/tests/unittests/test_lstm_cudnn_op.py
+589
-589
python/paddle/fluid/tests/unittests/test_rnn_op.py
python/paddle/fluid/tests/unittests/test_rnn_op.py
+212
-212
python/paddle/fluid/transpiler/memory_optimization_transpiler.py
...paddle/fluid/transpiler/memory_optimization_transpiler.py
+2
-2
未找到文件。
python/paddle/fluid/tests/unittests/asp/test_asp_utils.py
浏览文件 @
c6c9c186
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2021 NVIDIA Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
threading
,
time
import
paddle
import
numpy
as
np
class
TestASPUtils
(
unittest
.
TestCase
):
def
test_get_check_method
(
self
):
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
get_checking_method
(
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_1D
),
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_1D
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
get_checking_method
(
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_GREEDY
),
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
get_checking_method
(
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_BEST
),
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
)
def
test_density
(
self
):
x
=
np
.
array
([[
1.0
,
1.0
,
1.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
0.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
1.0
]])
self
.
assertEqual
(
paddle
.
incubate
.
asp
.
calculate_density
(
x
),
0.56
)
x
[:,
0
]
=
0.0
self
.
assertEqual
(
paddle
.
incubate
.
asp
.
calculate_density
(
x
),
0.4
)
def
test_check_mask_1d
(
self
):
x
=
np
.
array
([[
1.0
,
0.0
,
0.0
,
1.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
1.0
]])
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
4
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
3
,
4
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
5
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
3
,
5
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
3
,
6
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
4
,
6
))
def
test_get_mask_1d
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_1d
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
4
))
x
=
np
.
random
.
randn
(
5
,
4
)
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_1d
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
4
))
def
test_check_mask_2d
(
self
):
x
=
np
.
array
([[
1.0
,
0.0
,
0.0
,
1.0
,
1.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
0.0
],
[
0.0
,
0.0
,
1.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
0.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
1.0
]])
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
3
,
4
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
5
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
3
,
5
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
3
,
6
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
4
,
6
))
def
test_get_mask_2d_greedy
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_greedy
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
x
=
np
.
random
.
randn
(
5
,
4
)
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_greedy
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
def
test_get_mask_2d_best
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_best
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
x
=
np
.
random
.
randn
(
5
,
4
)
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_best
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
def
test_threadsafe_valid_2d_patterns
(
self
):
def
get_reference
(
m
=
4
,
n
=
2
):
from
itertools
import
permutations
patterns
=
np
.
zeros
(
m
)
patterns
[:
n
]
=
1
patterns
=
list
(
set
(
permutations
(
patterns
.
tolist
())))
patterns
=
patterns
+
patterns
patterns
=
np
.
asarray
(
list
(
set
(
permutations
(
patterns
,
m
))))
valid
=
((
patterns
.
sum
(
axis
=
1
)
<=
n
).
sum
(
axis
=
1
)
==
m
).
nonzero
()[
0
].
reshape
(
-
1
)
valid_patterns
=
np
.
empty
((
valid
.
shape
[
0
],
m
,
m
))
valid_patterns
[:]
=
patterns
[
valid
[:]]
return
valid_patterns
for
_
in
range
(
4
):
computing_thread
=
threading
.
Thread
(
target
=
paddle
.
fluid
.
contrib
.
sparsity
.
utils
.
_compute_valid_2d_patterns
,
args
=
(
2
,
4
))
computing_thread
.
start
()
time
.
sleep
(
3
)
patterns_map
=
paddle
.
fluid
.
contrib
.
sparsity
.
utils
.
_valid_2d_patterns
reference_patterns
=
get_reference
()
reference_key
=
'4_2'
self
.
assertTrue
(
reference_key
in
patterns_map
)
self
.
assertTrue
(
len
(
patterns_map
)
==
1
)
self
.
assertTrue
(
(
reference_patterns
==
patterns_map
[
reference_key
]).
all
())
def
test_check_sparsity
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
))
x_2d
=
x
.
reshape
(
1
,
x
.
shape
[
0
])
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x_2d
=
x
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
))
x_2d
=
x
.
reshape
(
x
.
shape
[
0
]
*
x
.
shape
[
1
],
x
.
shape
[
2
])
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
,
5
))
x_2d
=
x
.
reshape
(
x
.
shape
[
0
],
x
.
shape
[
1
]
*
x
.
shape
[
2
]
*
x
.
shape
[
3
])
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
def
test_create_mask
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
,
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
def
__test_1D_2D_sparsity_checking_methods
(
self
,
x_2d
):
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_1d
(
x_2d
,
2
,
4
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_1D
,
n
=
2
,
m
=
4
),
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
mask
,
2
,
4
))
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_best
(
x_2d
,
2
,
4
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
,
n
=
2
,
m
=
4
),
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
mask
,
2
,
4
))
def
__test_1D_2D_sparse_mask_generation_methods
(
self
,
x
):
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
create_mask
(
x
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_1D
,
n
=
2
,
m
=
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_1D
,
n
=
2
,
m
=
4
))
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
create_mask
(
x
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_GREEDY
,
n
=
2
,
m
=
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
,
n
=
2
,
m
=
4
))
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
create_mask
(
x
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_BEST
,
n
=
2
,
m
=
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
,
n
=
2
,
m
=
4
))
if
__name__
==
'__main__'
:
unittest
.
main
()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
# Copyright (c) 2021 NVIDIA Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
threading
,
time
import
paddle
import
numpy
as
np
class
TestASPUtils
(
unittest
.
TestCase
):
def
test_get_check_method
(
self
):
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
get_checking_method
(
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_1D
),
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_1D
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
get_checking_method
(
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_GREEDY
),
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
get_checking_method
(
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_BEST
),
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
)
def
test_density
(
self
):
x
=
np
.
array
([[
1.0
,
1.0
,
1.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
0.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
1.0
]])
self
.
assertEqual
(
paddle
.
incubate
.
asp
.
calculate_density
(
x
),
0.56
)
x
[:,
0
]
=
0.0
self
.
assertEqual
(
paddle
.
incubate
.
asp
.
calculate_density
(
x
),
0.4
)
def
test_check_mask_1d
(
self
):
x
=
np
.
array
([[
1.0
,
0.0
,
0.0
,
1.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
1.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
1.0
]])
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
4
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
3
,
4
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
5
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
3
,
5
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
3
,
6
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
4
,
6
))
def
test_get_mask_1d
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_1d
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
4
))
x
=
np
.
random
.
randn
(
5
,
4
)
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_1d
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
x
,
2
,
4
))
def
test_check_mask_2d
(
self
):
x
=
np
.
array
([[
1.0
,
0.0
,
0.0
,
1.0
,
1.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
0.0
],
[
0.0
,
0.0
,
1.0
,
0.0
,
1.0
],
[
1.0
,
1.0
,
0.0
,
0.0
,
0.0
],
[
0.0
,
1.0
,
0.0
,
0.0
,
1.0
]])
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
3
,
4
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
5
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
3
,
5
))
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
3
,
6
))
self
.
assertFalse
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
4
,
6
))
def
test_get_mask_2d_greedy
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_greedy
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
x
=
np
.
random
.
randn
(
5
,
4
)
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_greedy
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
def
test_get_mask_2d_best
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_best
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
x
=
np
.
random
.
randn
(
5
,
4
)
x
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_best
(
x
,
2
,
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
x
,
2
,
4
))
def
test_threadsafe_valid_2d_patterns
(
self
):
def
get_reference
(
m
=
4
,
n
=
2
):
from
itertools
import
permutations
patterns
=
np
.
zeros
(
m
)
patterns
[:
n
]
=
1
patterns
=
list
(
set
(
permutations
(
patterns
.
tolist
())))
patterns
=
patterns
+
patterns
patterns
=
np
.
asarray
(
list
(
set
(
permutations
(
patterns
,
m
))))
valid
=
((
patterns
.
sum
(
axis
=
1
)
<=
n
).
sum
(
axis
=
1
)
==
m
).
nonzero
()[
0
].
reshape
(
-
1
)
valid_patterns
=
np
.
empty
((
valid
.
shape
[
0
],
m
,
m
))
valid_patterns
[:]
=
patterns
[
valid
[:]]
return
valid_patterns
for
_
in
range
(
4
):
computing_thread
=
threading
.
Thread
(
target
=
paddle
.
fluid
.
contrib
.
sparsity
.
utils
.
_compute_valid_2d_patterns
,
args
=
(
2
,
4
))
computing_thread
.
start
()
time
.
sleep
(
3
)
patterns_map
=
paddle
.
fluid
.
contrib
.
sparsity
.
utils
.
_valid_2d_patterns
reference_patterns
=
get_reference
()
reference_key
=
'4_2'
self
.
assertTrue
(
reference_key
in
patterns_map
)
self
.
assertTrue
(
len
(
patterns_map
)
==
1
)
self
.
assertTrue
(
(
reference_patterns
==
patterns_map
[
reference_key
]).
all
())
def
test_check_sparsity
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
))
x_2d
=
x
.
reshape
(
1
,
x
.
shape
[
0
])
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
x_2d
=
x
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
))
x_2d
=
x
.
reshape
(
x
.
shape
[
0
]
*
x
.
shape
[
1
],
x
.
shape
[
2
])
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
,
5
))
x_2d
=
x
.
reshape
(
x
.
shape
[
0
],
x
.
shape
[
1
]
*
x
.
shape
[
2
]
*
x
.
shape
[
3
])
self
.
__test_1D_2D_sparsity_checking_methods
(
x_2d
)
def
test_create_mask
(
self
):
for
_
in
range
(
10
):
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
x
=
np
.
random
.
randint
(
10
,
size
=
(
5
,
5
,
5
,
5
))
self
.
__test_1D_2D_sparse_mask_generation_methods
(
x
)
def
__test_1D_2D_sparsity_checking_methods
(
self
,
x_2d
):
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_1d
(
x_2d
,
2
,
4
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_1D
,
n
=
2
,
m
=
4
),
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_1d
(
mask
,
2
,
4
))
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
get_mask_2d_best
(
x_2d
,
2
,
4
)
self
.
assertEqual
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
,
n
=
2
,
m
=
4
),
paddle
.
fluid
.
contrib
.
sparsity
.
check_mask_2d
(
mask
,
2
,
4
))
def
__test_1D_2D_sparse_mask_generation_methods
(
self
,
x
):
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
create_mask
(
x
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_1D
,
n
=
2
,
m
=
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_1D
,
n
=
2
,
m
=
4
))
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
create_mask
(
x
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_GREEDY
,
n
=
2
,
m
=
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
,
n
=
2
,
m
=
4
))
mask
=
paddle
.
fluid
.
contrib
.
sparsity
.
create_mask
(
x
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
MaskAlgo
.
MASK_2D_BEST
,
n
=
2
,
m
=
4
)
self
.
assertTrue
(
paddle
.
fluid
.
contrib
.
sparsity
.
check_sparsity
(
mask
,
func_name
=
paddle
.
fluid
.
contrib
.
sparsity
.
CheckMethod
.
CHECK_2D
,
n
=
2
,
m
=
4
))
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/mkldnn/test_elementwise_sub_mkldnn_op.py
浏览文件 @
c6c9c186
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
from
paddle
import
enable_static
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
OpTestTool
,
convert_float_to_uint16
from
paddle.fluid.framework
import
_current_expected_place
import
paddle.fluid.core
as
core
@
OpTestTool
.
skip_if
(
not
(
isinstance
(
_current_expected_place
(),
core
.
CPUPlace
)),
"GPU is not supported"
)
class
TestMKLDNNElementwiseSubOp
(
OpTest
):
def
setUp
(
self
):
self
.
op_type
=
"elementwise_sub"
self
.
init_dtype
()
self
.
init_input_output
()
self
.
init_kernel_type
()
self
.
init_axis
()
self
.
inputs
=
{
'X'
:
OpTest
.
np_dtype_to_fluid_dtype
(
self
.
x
),
'Y'
:
OpTest
.
np_dtype_to_fluid_dtype
(
self
.
y
)
}
self
.
attrs
=
{
'axis'
:
self
.
axis
,
'use_mkldnn'
:
self
.
use_mkldnn
}
self
.
outputs
=
{
'Out'
:
self
.
out
}
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
0.1
,
1
,
[
13
,
17
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
0.1
,
1
,
[
13
,
17
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
test_check_grad_normal
(
self
):
self
.
check_grad
([
'X'
,
'Y'
],
'Out'
)
def
test_check_grad_ignore_x
(
self
):
self
.
check_grad
([
'Y'
],
'Out'
,
no_grad_set
=
set
(
"X"
))
def
test_check_grad_ignore_y
(
self
):
self
.
check_grad
([
'X'
],
'Out'
,
no_grad_set
=
set
(
'Y'
))
def
init_axis
(
self
):
self
.
axis
=
-
1
def
init_kernel_type
(
self
):
self
.
use_mkldnn
=
True
def
init_dtype
(
self
):
self
.
dtype
=
np
.
float32
def
test_check_output
(
self
):
self
.
check_output
()
class
TestMKLDNNElementwiseSubOp2
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
random
((
100
,
)).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
random
((
100
,
)).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp3
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
0.1
,
1
,
[
2
,
3
,
4
,
5
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
0.1
,
1
,
[
2
,
3
,
4
,
5
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp4
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
1
,
2
,
[
2
,
3
,
4
,
32
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
1
,
2
,
[
4
,
32
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp5
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
1
,
2
,
[
2
,
3
,
4
,
100
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
1
,
2
,
[
100
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp_broadcast
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
rand
(
2
,
10
,
12
,
3
).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
rand
(
10
,
12
).
astype
(
self
.
dtype
)
self
.
out
=
self
.
x
-
self
.
y
.
reshape
(
1
,
10
,
12
,
1
)
def
init_axis
(
self
):
self
.
axis
=
1
class
TestElementwiseSubOp_xsize_lessthan_ysize_sub
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
rand
(
10
,
12
).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
rand
(
2
,
2
,
10
,
12
).
astype
(
self
.
dtype
)
self
.
out
=
self
.
x
-
self
.
y
def
init_axis
(
self
):
self
.
axis
=
2
def
test_check_grad_normal
(
self
):
pass
def
test_check_grad_ignore_y
(
self
):
pass
def
test_check_grad_ignore_x
(
self
):
pass
@
OpTestTool
.
skip_if_not_cpu_bf16
()
class
TestBf16
(
TestMKLDNNElementwiseSubOp
):
def
setUp
(
self
):
self
.
op_type
=
"elementwise_sub"
self
.
init_dtype
()
self
.
init_input_output
()
self
.
init_kernel_type
()
self
.
init_axis
()
self
.
x_bf16
=
convert_float_to_uint16
(
self
.
x
)
self
.
y_bf16
=
convert_float_to_uint16
(
self
.
y
)
self
.
inputs
=
{
'X'
:
self
.
x_bf16
,
'Y'
:
self
.
y_bf16
}
self
.
attrs
=
{
'axis'
:
self
.
axis
,
'use_mkldnn'
:
self
.
use_mkldnn
}
self
.
outputs
=
{
'Out'
:
convert_float_to_uint16
(
self
.
out
)}
def
init_dtype
(
self
):
self
.
dtype
=
np
.
float32
self
.
mkldnn_data_type
=
"bfloat16"
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
random
(
100
,
).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
random
(
100
,
).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
test_check_output
(
self
):
self
.
check_output_with_place
(
core
.
CPUPlace
())
def
test_check_grad_normal
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"X"
,
"Y"
],
"Out"
,
user_defined_grads
=
[
self
.
x
,
-
self
.
x
],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
def
test_check_grad_ignore_x
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"Y"
],
"Out"
,
user_defined_grads
=
[
-
self
.
y
],
user_defined_grad_outputs
=
[
self
.
y_bf16
])
def
test_check_grad_ignore_y
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"X"
],
"Out"
,
user_defined_grads
=
[
self
.
x
],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
class
TestBf16Broadcasting
(
TestBf16
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
1
,
2
,
[
2
,
3
,
4
,
100
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
1
,
2
,
[
100
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
compute_reduced_gradients
(
self
,
out_grads
):
part_sum
=
np
.
add
.
reduceat
(
out_grads
,
[
0
],
axis
=
0
)
part_sum
=
np
.
add
.
reduceat
(
part_sum
,
[
0
],
axis
=
1
)
part_sum
=
np
.
add
.
reduceat
(
part_sum
,
[
0
],
axis
=
2
)
return
-
part_sum
.
flatten
()
def
test_check_grad_normal
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"X"
,
"Y"
],
"Out"
,
user_defined_grads
=
[
self
.
x
,
self
.
compute_reduced_gradients
(
self
.
x
)],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
def
test_check_grad_ignore_x
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"Y"
],
"Out"
,
user_defined_grads
=
[
self
.
compute_reduced_gradients
(
self
.
x
)],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
class
TestInt8
(
TestMKLDNNElementwiseSubOp
):
def
init_kernel_type
(
self
):
self
.
use_mkldnn
=
True
self
.
_cpu_only
=
True
def
init_dtype
(
self
):
self
.
dtype
=
np
.
int8
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
randint
(
0
,
3
,
(
12
,
9
)).
astype
(
"int8"
)
self
.
y
=
np
.
random
.
randint
(
0
,
3
,
(
12
,
9
)).
astype
(
"int8"
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
init_scales
(
self
):
self
.
attrs
[
'Scale_x'
]
=
1.0
self
.
attrs
[
'Scale_y'
]
=
1.0
self
.
attrs
[
'Scale_out'
]
=
1.0
def
test_check_output
(
self
):
self
.
init_scales
()
self
.
check_output
()
def
test_check_grad_normal
(
self
):
pass
def
test_check_grad_ignore_x
(
self
):
pass
def
test_check_grad_ignore_y
(
self
):
pass
if
__name__
==
'__main__'
:
enable_static
()
unittest
.
main
()
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
from
paddle
import
enable_static
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
OpTestTool
,
convert_float_to_uint16
from
paddle.fluid.framework
import
_current_expected_place
import
paddle.fluid.core
as
core
@
OpTestTool
.
skip_if
(
not
(
isinstance
(
_current_expected_place
(),
core
.
CPUPlace
)),
"GPU is not supported"
)
class
TestMKLDNNElementwiseSubOp
(
OpTest
):
def
setUp
(
self
):
self
.
op_type
=
"elementwise_sub"
self
.
init_dtype
()
self
.
init_input_output
()
self
.
init_kernel_type
()
self
.
init_axis
()
self
.
inputs
=
{
'X'
:
OpTest
.
np_dtype_to_fluid_dtype
(
self
.
x
),
'Y'
:
OpTest
.
np_dtype_to_fluid_dtype
(
self
.
y
)
}
self
.
attrs
=
{
'axis'
:
self
.
axis
,
'use_mkldnn'
:
self
.
use_mkldnn
}
self
.
outputs
=
{
'Out'
:
self
.
out
}
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
0.1
,
1
,
[
13
,
17
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
0.1
,
1
,
[
13
,
17
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
test_check_grad_normal
(
self
):
self
.
check_grad
([
'X'
,
'Y'
],
'Out'
)
def
test_check_grad_ignore_x
(
self
):
self
.
check_grad
([
'Y'
],
'Out'
,
no_grad_set
=
set
(
"X"
))
def
test_check_grad_ignore_y
(
self
):
self
.
check_grad
([
'X'
],
'Out'
,
no_grad_set
=
set
(
'Y'
))
def
init_axis
(
self
):
self
.
axis
=
-
1
def
init_kernel_type
(
self
):
self
.
use_mkldnn
=
True
def
init_dtype
(
self
):
self
.
dtype
=
np
.
float32
def
test_check_output
(
self
):
self
.
check_output
()
class
TestMKLDNNElementwiseSubOp2
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
random
((
100
,
)).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
random
((
100
,
)).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp3
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
0.1
,
1
,
[
2
,
3
,
4
,
5
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
0.1
,
1
,
[
2
,
3
,
4
,
5
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp4
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
1
,
2
,
[
2
,
3
,
4
,
32
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
1
,
2
,
[
4
,
32
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp5
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
1
,
2
,
[
2
,
3
,
4
,
100
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
1
,
2
,
[
100
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
class
TestMKLDNNElementwiseSubOp_broadcast
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
rand
(
2
,
10
,
12
,
3
).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
rand
(
10
,
12
).
astype
(
self
.
dtype
)
self
.
out
=
self
.
x
-
self
.
y
.
reshape
(
1
,
10
,
12
,
1
)
def
init_axis
(
self
):
self
.
axis
=
1
class
TestElementwiseSubOp_xsize_lessthan_ysize_sub
(
TestMKLDNNElementwiseSubOp
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
rand
(
10
,
12
).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
rand
(
2
,
2
,
10
,
12
).
astype
(
self
.
dtype
)
self
.
out
=
self
.
x
-
self
.
y
def
init_axis
(
self
):
self
.
axis
=
2
def
test_check_grad_normal
(
self
):
pass
def
test_check_grad_ignore_y
(
self
):
pass
def
test_check_grad_ignore_x
(
self
):
pass
@
OpTestTool
.
skip_if_not_cpu_bf16
()
class
TestBf16
(
TestMKLDNNElementwiseSubOp
):
def
setUp
(
self
):
self
.
op_type
=
"elementwise_sub"
self
.
init_dtype
()
self
.
init_input_output
()
self
.
init_kernel_type
()
self
.
init_axis
()
self
.
x_bf16
=
convert_float_to_uint16
(
self
.
x
)
self
.
y_bf16
=
convert_float_to_uint16
(
self
.
y
)
self
.
inputs
=
{
'X'
:
self
.
x_bf16
,
'Y'
:
self
.
y_bf16
}
self
.
attrs
=
{
'axis'
:
self
.
axis
,
'use_mkldnn'
:
self
.
use_mkldnn
}
self
.
outputs
=
{
'Out'
:
convert_float_to_uint16
(
self
.
out
)}
def
init_dtype
(
self
):
self
.
dtype
=
np
.
float32
self
.
mkldnn_data_type
=
"bfloat16"
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
random
(
100
,
).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
random
(
100
,
).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
test_check_output
(
self
):
self
.
check_output_with_place
(
core
.
CPUPlace
())
def
test_check_grad_normal
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"X"
,
"Y"
],
"Out"
,
user_defined_grads
=
[
self
.
x
,
-
self
.
x
],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
def
test_check_grad_ignore_x
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"Y"
],
"Out"
,
user_defined_grads
=
[
-
self
.
y
],
user_defined_grad_outputs
=
[
self
.
y_bf16
])
def
test_check_grad_ignore_y
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"X"
],
"Out"
,
user_defined_grads
=
[
self
.
x
],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
class
TestBf16Broadcasting
(
TestBf16
):
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
uniform
(
1
,
2
,
[
2
,
3
,
4
,
100
]).
astype
(
self
.
dtype
)
self
.
y
=
np
.
random
.
uniform
(
1
,
2
,
[
100
]).
astype
(
self
.
dtype
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
compute_reduced_gradients
(
self
,
out_grads
):
part_sum
=
np
.
add
.
reduceat
(
out_grads
,
[
0
],
axis
=
0
)
part_sum
=
np
.
add
.
reduceat
(
part_sum
,
[
0
],
axis
=
1
)
part_sum
=
np
.
add
.
reduceat
(
part_sum
,
[
0
],
axis
=
2
)
return
-
part_sum
.
flatten
()
def
test_check_grad_normal
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"X"
,
"Y"
],
"Out"
,
user_defined_grads
=
[
self
.
x
,
self
.
compute_reduced_gradients
(
self
.
x
)],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
def
test_check_grad_ignore_x
(
self
):
self
.
check_grad_with_place
(
core
.
CPUPlace
(),
[
"Y"
],
"Out"
,
user_defined_grads
=
[
self
.
compute_reduced_gradients
(
self
.
x
)],
user_defined_grad_outputs
=
[
self
.
x_bf16
])
class
TestInt8
(
TestMKLDNNElementwiseSubOp
):
def
init_kernel_type
(
self
):
self
.
use_mkldnn
=
True
self
.
_cpu_only
=
True
def
init_dtype
(
self
):
self
.
dtype
=
np
.
int8
def
init_input_output
(
self
):
self
.
x
=
np
.
random
.
randint
(
0
,
3
,
(
12
,
9
)).
astype
(
"int8"
)
self
.
y
=
np
.
random
.
randint
(
0
,
3
,
(
12
,
9
)).
astype
(
"int8"
)
self
.
out
=
np
.
subtract
(
self
.
x
,
self
.
y
)
def
init_scales
(
self
):
self
.
attrs
[
'Scale_x'
]
=
1.0
self
.
attrs
[
'Scale_y'
]
=
1.0
self
.
attrs
[
'Scale_out'
]
=
1.0
def
test_check_output
(
self
):
self
.
init_scales
()
self
.
check_output
()
def
test_check_grad_normal
(
self
):
pass
def
test_check_grad_ignore_x
(
self
):
pass
def
test_check_grad_ignore_y
(
self
):
pass
if
__name__
==
'__main__'
:
enable_static
()
unittest
.
main
()
python/paddle/fluid/tests/unittests/mkldnn/test_fill_constant_mkldnn_op.py
浏览文件 @
c6c9c186
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
OpTestTool
import
paddle
@
OpTestTool
.
skip_if_not_cpu_bf16
()
class
TestFillConstant2DOneDNNOp
(
OpTest
):
def
setUp
(
self
):
self
.
op_type
=
"fill_constant"
self
.
dtype
=
np
.
float32
self
.
shape_tensor_list
=
None
self
.
shape_tensor
=
None
self
.
str_value
=
""
real_shape
=
[]
self
.
value
=
0.1
self
.
set_inputs
()
self
.
set_attrs
()
if
'value'
in
self
.
attrs
:
self
.
value
=
self
.
attrs
[
'value'
]
if
self
.
str_value
!=
""
:
self
.
value
=
float
(
self
.
str_value
)
if
'ValueTensor'
in
self
.
inputs
:
self
.
value
=
self
.
inputs
[
'ValueTensor'
]
if
'shape'
in
self
.
attrs
:
real_shape
=
self
.
attrs
[
'shape'
]
if
'ShapeTensor'
in
self
.
inputs
:
real_shape
=
list
(
self
.
inputs
[
'ShapeTensor'
])
if
'ShapeTensorList'
in
self
.
inputs
:
real_shape
=
[]
for
shape_tensor
in
self
.
inputs
[
'ShapeTensorList'
]:
real_shape
.
append
(
shape_tensor
[
1
].
item
())
self
.
outputs
=
{
'Out'
:
np
.
full
(
real_shape
,
self
.
value
)}
def
set_inputs
(
self
):
self
.
inputs
=
{}
def
set_attrs
(
self
):
self
.
attrs
=
{
'shape'
:
(
3
,
5
),
'use_mkldnn'
:
True
,
'value'
:
self
.
value
}
def
test_check_output
(
self
):
self
.
check_output
()
class
TestFillZerosLike4DShapeTensorPriorityOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_inputs
(
self
):
self
.
inputs
=
{
'ShapeTensor'
:
np
.
array
([
5
,
6
,
7
,
8
]).
astype
(
"int32"
)}
class
TestFillZerosLike4DShapeTensorListPriorityOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_inputs
(
self
):
shape
=
(
4
,
5
,
6
,
7
)
self
.
shape_tensor_list
=
[]
for
index
,
elem
in
enumerate
(
shape
):
self
.
shape_tensor_list
.
append
((
"x"
+
str
(
index
),
np
.
ones
(
(
1
)).
astype
(
'int32'
)
*
elem
))
self
.
inputs
=
{
'ShapeTensorList'
:
self
.
shape_tensor_list
}
class
TestFillZerosLike2DStringValueInfOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_attrs
(
self
):
self
.
str_value
=
"inf"
self
.
attrs
=
{
'shape'
:
(
10
,
13
),
'use_mkldnn'
:
True
,
'str_value'
:
"inf"
}
class
TestFillZerosLike2DStringValueMinusInfOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_attrs
(
self
):
self
.
str_value
=
"-inf"
self
.
attrs
=
{
'shape'
:
(
10
,
13
),
'use_mkldnn'
:
True
,
'str_value'
:
"-inf"
}
class
TestFillZerosLike2DStringValueFloatOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_attrs
(
self
):
self
.
str_value
=
"0.123"
self
.
attrs
=
{
'shape'
:
(
10
,
13
),
'use_mkldnn'
:
True
,
'str_value'
:
"0.123"
}
class
TestFillZerosLike2DValueTensorPriorityOneDNNOp
(
TestFillZerosLike2DStringValueFloatOneDNNOp
):
def
set_inputs
(
self
):
self
.
inputs
=
{
'ValueTensor'
:
np
.
atleast_1d
(
2.25
).
astype
(
"float32"
)}
if
__name__
==
"__main__"
:
paddle
.
enable_static
()
unittest
.
main
()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
from
paddle.fluid.tests.unittests.op_test
import
OpTest
,
OpTestTool
import
paddle
@
OpTestTool
.
skip_if_not_cpu_bf16
()
class
TestFillConstant2DOneDNNOp
(
OpTest
):
def
setUp
(
self
):
self
.
op_type
=
"fill_constant"
self
.
dtype
=
np
.
float32
self
.
shape_tensor_list
=
None
self
.
shape_tensor
=
None
self
.
str_value
=
""
real_shape
=
[]
self
.
value
=
0.1
self
.
set_inputs
()
self
.
set_attrs
()
if
'value'
in
self
.
attrs
:
self
.
value
=
self
.
attrs
[
'value'
]
if
self
.
str_value
!=
""
:
self
.
value
=
float
(
self
.
str_value
)
if
'ValueTensor'
in
self
.
inputs
:
self
.
value
=
self
.
inputs
[
'ValueTensor'
]
if
'shape'
in
self
.
attrs
:
real_shape
=
self
.
attrs
[
'shape'
]
if
'ShapeTensor'
in
self
.
inputs
:
real_shape
=
list
(
self
.
inputs
[
'ShapeTensor'
])
if
'ShapeTensorList'
in
self
.
inputs
:
real_shape
=
[]
for
shape_tensor
in
self
.
inputs
[
'ShapeTensorList'
]:
real_shape
.
append
(
shape_tensor
[
1
].
item
())
self
.
outputs
=
{
'Out'
:
np
.
full
(
real_shape
,
self
.
value
)}
def
set_inputs
(
self
):
self
.
inputs
=
{}
def
set_attrs
(
self
):
self
.
attrs
=
{
'shape'
:
(
3
,
5
),
'use_mkldnn'
:
True
,
'value'
:
self
.
value
}
def
test_check_output
(
self
):
self
.
check_output
()
class
TestFillZerosLike4DShapeTensorPriorityOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_inputs
(
self
):
self
.
inputs
=
{
'ShapeTensor'
:
np
.
array
([
5
,
6
,
7
,
8
]).
astype
(
"int32"
)}
class
TestFillZerosLike4DShapeTensorListPriorityOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_inputs
(
self
):
shape
=
(
4
,
5
,
6
,
7
)
self
.
shape_tensor_list
=
[]
for
index
,
elem
in
enumerate
(
shape
):
self
.
shape_tensor_list
.
append
((
"x"
+
str
(
index
),
np
.
ones
(
(
1
)).
astype
(
'int32'
)
*
elem
))
self
.
inputs
=
{
'ShapeTensorList'
:
self
.
shape_tensor_list
}
class
TestFillZerosLike2DStringValueInfOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_attrs
(
self
):
self
.
str_value
=
"inf"
self
.
attrs
=
{
'shape'
:
(
10
,
13
),
'use_mkldnn'
:
True
,
'str_value'
:
"inf"
}
class
TestFillZerosLike2DStringValueMinusInfOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_attrs
(
self
):
self
.
str_value
=
"-inf"
self
.
attrs
=
{
'shape'
:
(
10
,
13
),
'use_mkldnn'
:
True
,
'str_value'
:
"-inf"
}
class
TestFillZerosLike2DStringValueFloatOneDNNOp
(
TestFillConstant2DOneDNNOp
):
def
set_attrs
(
self
):
self
.
str_value
=
"0.123"
self
.
attrs
=
{
'shape'
:
(
10
,
13
),
'use_mkldnn'
:
True
,
'str_value'
:
"0.123"
}
class
TestFillZerosLike2DValueTensorPriorityOneDNNOp
(
TestFillZerosLike2DStringValueFloatOneDNNOp
):
def
set_inputs
(
self
):
self
.
inputs
=
{
'ValueTensor'
:
np
.
atleast_1d
(
2.25
).
astype
(
"float32"
)}
if
__name__
==
"__main__"
:
paddle
.
enable_static
()
unittest
.
main
()
python/paddle/fluid/tests/unittests/sequence/test_sequence_first_step.py
浏览文件 @
c6c9c186
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
paddle.fluid
as
fluid
from
paddle.fluid.framework
import
convert_np_dtype_to_dtype_
,
Program
,
program_guard
import
paddle.fluid.core
as
core
import
numpy
as
np
import
copy
import
unittest
import
sys
sys
.
path
.
append
(
"../"
)
from
op_test
import
OpTest
class
TestSequenceFirstStepOpError
(
unittest
.
TestCase
):
def
test_errors
(
self
):
with
program_guard
(
Program
(),
Program
()):
def
test_Variable
():
# the input must be Variable
input_data
=
np
.
random
.
randint
(
1
,
5
,
[
4
]).
astype
(
"int64"
)
fluid
.
layers
.
sequence_last_step
(
input_data
)
self
.
assertRaises
(
TypeError
,
test_Variable
)
def
test_input_dtype
():
# the dtype of input must be int64
type_data
=
fluid
.
layers
.
data
(
name
=
'type_data'
,
shape
=
[
7
,
1
],
append_batch_size
=
False
,
dtype
=
'int64'
,
lod_level
=
1
)
fluid
.
layers
.
sequence_last_step
(
type_data
)
self
.
assertRaises
(
TypeError
,
test_input_dtype
)
if
__name__
==
'__main__'
:
unittest
.
main
()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
paddle.fluid
as
fluid
from
paddle.fluid.framework
import
convert_np_dtype_to_dtype_
,
Program
,
program_guard
import
paddle.fluid.core
as
core
import
numpy
as
np
import
copy
import
unittest
import
sys
sys
.
path
.
append
(
"../"
)
from
op_test
import
OpTest
class
TestSequenceFirstStepOpError
(
unittest
.
TestCase
):
def
test_errors
(
self
):
with
program_guard
(
Program
(),
Program
()):
def
test_Variable
():
# the input must be Variable
input_data
=
np
.
random
.
randint
(
1
,
5
,
[
4
]).
astype
(
"int64"
)
fluid
.
layers
.
sequence_last_step
(
input_data
)
self
.
assertRaises
(
TypeError
,
test_Variable
)
def
test_input_dtype
():
# the dtype of input must be int64
type_data
=
fluid
.
layers
.
data
(
name
=
'type_data'
,
shape
=
[
7
,
1
],
append_batch_size
=
False
,
dtype
=
'int64'
,
lod_level
=
1
)
fluid
.
layers
.
sequence_last_step
(
type_data
)
self
.
assertRaises
(
TypeError
,
test_input_dtype
)
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/sequence/test_sequence_last_step.py
浏览文件 @
c6c9c186
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
paddle.fluid
as
fluid
from
paddle.fluid.framework
import
convert_np_dtype_to_dtype_
,
Program
,
program_guard
import
paddle.fluid.core
as
core
import
numpy
as
np
import
copy
import
unittest
import
sys
sys
.
path
.
append
(
"../"
)
from
op_test
import
OpTest
class
TestSequenceLastStepOpError
(
unittest
.
TestCase
):
def
test_errors
(
self
):
with
program_guard
(
Program
(),
Program
()):
def
test_Variable
():
# the input must be Variable
input_data
=
np
.
random
.
randint
(
1
,
5
,
[
4
]).
astype
(
"int64"
)
fluid
.
layers
.
sequence_last_step
(
input_data
)
self
.
assertRaises
(
TypeError
,
test_Variable
)
def
test_input_dtype
():
# the dtype of input must be int64
type_data
=
fluid
.
layers
.
data
(
name
=
'type_data'
,
shape
=
[
7
,
1
],
append_batch_size
=
False
,
dtype
=
'int64'
,
lod_level
=
1
)
fluid
.
layers
.
sequence_last_step
(
type_data
)
self
.
assertRaises
(
TypeError
,
test_input_dtype
)
if
__name__
==
'__main__'
:
unittest
.
main
()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
paddle.fluid
as
fluid
from
paddle.fluid.framework
import
convert_np_dtype_to_dtype_
,
Program
,
program_guard
import
paddle.fluid.core
as
core
import
numpy
as
np
import
copy
import
unittest
import
sys
sys
.
path
.
append
(
"../"
)
from
op_test
import
OpTest
class
TestSequenceLastStepOpError
(
unittest
.
TestCase
):
def
test_errors
(
self
):
with
program_guard
(
Program
(),
Program
()):
def
test_Variable
():
# the input must be Variable
input_data
=
np
.
random
.
randint
(
1
,
5
,
[
4
]).
astype
(
"int64"
)
fluid
.
layers
.
sequence_last_step
(
input_data
)
self
.
assertRaises
(
TypeError
,
test_Variable
)
def
test_input_dtype
():
# the dtype of input must be int64
type_data
=
fluid
.
layers
.
data
(
name
=
'type_data'
,
shape
=
[
7
,
1
],
append_batch_size
=
False
,
dtype
=
'int64'
,
lod_level
=
1
)
fluid
.
layers
.
sequence_last_step
(
type_data
)
self
.
assertRaises
(
TypeError
,
test_input_dtype
)
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_lstm_cudnn_op.py
浏览文件 @
c6c9c186
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
math
import
paddle.fluid.core
as
core
from
op_test
import
OpTest
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid.layers
as
layers
import
random
random
.
seed
(
2
)
np
.
set_printoptions
(
threshold
=
np
.
inf
)
paddle
.
enable_static
()
SIGMOID_THRESHOLD_MIN
=
-
40.0
SIGMOID_THRESHOLD_MAX
=
13.0
EXP_MAX_INPUT
=
40.0
class
RandomWeight
:
def
__init__
(
self
):
pass
def
updata_weight
(
self
,
hidden_size
,
input_size
,
dtype
):
std
=
1.0
/
math
.
sqrt
(
hidden_size
)
self
.
hidden_size
=
hidden_size
self
.
input_size
=
input_size
self
.
dtype
=
dtype
self
.
weight_ih
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
,
self
.
input_size
)).
astype
(
dtype
)
self
.
weight_hh
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
,
self
.
hidden_size
)).
astype
(
dtype
)
self
.
bias_ih
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
)).
astype
(
dtype
)
self
.
bias_hh
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
)).
astype
(
dtype
)
weight
=
RandomWeight
()
class
LayerMixin
(
object
):
def
__call__
(
self
,
*
args
,
**
kwargs
):
return
self
.
forward
(
*
args
,
**
kwargs
)
class
LayerListMixin
(
LayerMixin
):
def
__init__
(
self
,
layers
=
None
):
self
.
_layers
=
list
(
layers
)
if
layers
else
[]
def
append
(
self
,
layer
):
self
.
_layers
.
append
(
layer
)
def
__iter__
(
self
):
return
iter
(
self
.
_layers
)
class
LSTMCell
(
LayerMixin
):
def
__init__
(
self
,
input_size
,
hidden_size
,
bias
=
True
):
self
.
input_size
=
input_size
self
.
hidden_size
=
hidden_size
self
.
bias
=
bias
self
.
dtype
=
np
.
float64
self
.
parameters
=
dict
()
self
.
weight_ih
=
weight
.
weight_ih
self
.
weight_hh
=
weight
.
weight_hh
self
.
parameters
[
'weight_ih'
]
=
self
.
weight_ih
self
.
parameters
[
'weight_hh'
]
=
self
.
weight_hh
if
bias
:
self
.
bias_ih
=
weight
.
bias_ih
self
.
bias_hh
=
weight
.
bias_hh
self
.
parameters
[
'bias_ih'
]
=
self
.
bias_ih
self
.
parameters
[
'bias_hh'
]
=
self
.
bias_hh
else
:
self
.
bias_ih
=
None
self
.
bias_hh
=
None
def
init_state
(
self
,
inputs
):
batch_size
=
inputs
.
shape
[
0
]
init_h
=
np
.
zeros
((
batch_size
,
self
.
hidden_size
),
dtype
=
inputs
.
dtype
)
init_c
=
np
.
zeros
((
batch_size
,
self
.
hidden_size
),
dtype
=
inputs
.
dtype
)
return
init_h
,
init_c
def
forward
(
self
,
inputs
,
hx
=
None
):
if
hx
is
None
:
hx
=
self
.
init_state
(
inputs
)
pre_hidden
,
pre_cell
=
hx
gates
=
np
.
matmul
(
inputs
,
self
.
weight_ih
.
T
)
if
self
.
bias_ih
is
not
None
:
gates
=
gates
+
self
.
bias_ih
gates
+=
np
.
matmul
(
pre_hidden
,
self
.
weight_hh
.
T
)
if
self
.
bias_hh
is
not
None
:
gates
=
gates
+
self
.
bias_hh
chunked_gates
=
np
.
split
(
gates
,
4
,
-
1
)
i
=
1.0
/
(
1.0
+
np
.
exp
(
-
chunked_gates
[
0
]))
f
=
1.0
/
(
1.0
+
np
.
exp
(
-
chunked_gates
[
1
]))
o
=
1.0
/
(
1.0
+
np
.
exp
(
-
chunked_gates
[
3
]))
c
=
f
*
pre_cell
+
i
*
np
.
tanh
(
chunked_gates
[
2
])
h
=
o
*
np
.
tanh
(
c
)
return
h
,
(
h
,
c
)
def
sequence_mask
(
lengths
,
max_len
=
None
):
if
max_len
is
None
:
max_len
=
np
.
max
(
lengths
)
else
:
assert
max_len
>=
np
.
max
(
lengths
)
return
np
.
arange
(
max_len
)
<
np
.
expand_dims
(
lengths
,
-
1
)
def
update_state
(
mask
,
new
,
old
):
if
not
isinstance
(
old
,
(
tuple
,
list
)):
return
np
.
where
(
mask
,
new
,
old
)
else
:
return
tuple
(
map
(
lambda
x
,
y
:
np
.
where
(
mask
,
x
,
y
),
new
,
old
))
def
rnn
(
cell
,
inputs
,
initial_states
,
sequence_length
=
None
,
time_major
=
False
,
is_reverse
=
False
):
if
not
time_major
:
inputs
=
np
.
transpose
(
inputs
,
[
1
,
0
,
2
])
if
is_reverse
:
inputs
=
np
.
flip
(
inputs
,
0
)
if
sequence_length
is
None
:
mask
=
None
else
:
mask
=
np
.
transpose
(
sequence_mask
(
sequence_length
),
[
1
,
0
])
mask
=
np
.
expand_dims
(
mask
,
-
1
)
if
is_reverse
:
mask
=
np
.
flip
(
mask
,
0
)
time_steps
=
inputs
.
shape
[
0
]
state
=
initial_states
outputs
=
[]
for
t
in
range
(
time_steps
):
x_t
=
inputs
[
t
]
if
mask
is
not
None
:
m_t
=
mask
[
t
]
y
,
new_state
=
cell
(
x_t
,
state
)
y
=
np
.
where
(
m_t
,
y
,
0.
)
outputs
.
append
(
y
)
state
=
update_state
(
m_t
,
new_state
,
state
)
else
:
y
,
new_state
=
cell
(
x_t
,
state
)
outputs
.
append
(
y
)
state
=
new_state
outputs
=
np
.
stack
(
outputs
)
final_state
=
state
if
is_reverse
:
outputs
=
np
.
flip
(
outputs
,
0
)
if
not
time_major
:
outputs
=
np
.
transpose
(
outputs
,
[
1
,
0
,
2
])
return
outputs
,
final_state
def
birnn
(
cell_fw
,
cell_bw
,
inputs
,
initial_states
,
sequence_length
=
None
,
time_major
=
False
):
states_fw
,
states_bw
=
initial_states
outputs_fw
,
states_fw
=
rnn
(
cell_fw
,
inputs
,
states_fw
,
sequence_length
,
time_major
=
time_major
)
outputs_bw
,
states_bw
=
rnn
(
cell_bw
,
inputs
,
states_bw
,
sequence_length
,
time_major
=
time_major
,
is_reverse
=
True
)
outputs
=
np
.
concatenate
((
outputs_fw
,
outputs_bw
),
-
1
)
final_states
=
(
states_fw
,
states_bw
)
return
outputs
,
final_states
def
flatten
(
nested
):
return
list
(
_flatten
(
nested
))
def
_flatten
(
nested
):
for
item
in
nested
:
if
isinstance
(
item
,
(
list
,
tuple
)):
for
subitem
in
_flatten
(
item
):
yield
subitem
else
:
yield
item
def
unstack
(
array
,
axis
=
0
):
num
=
array
.
shape
[
axis
]
sub_arrays
=
np
.
split
(
array
,
num
,
axis
)
return
[
np
.
squeeze
(
sub_array
,
axis
)
for
sub_array
in
sub_arrays
]
def
dropout
(
array
,
p
=
0.0
):
if
p
==
0.0
:
return
array
mask
=
(
np
.
random
.
uniform
(
size
=
array
.
shape
)
<
(
1
-
p
)).
astype
(
array
.
dtype
)
return
array
*
(
mask
/
(
1
-
p
))
def
split_states
(
states
,
bidirectional
=
False
,
state_components
=
1
):
if
state_components
==
1
:
states
=
unstack
(
states
)
if
not
bidirectional
:
return
states
else
:
return
list
(
zip
(
states
[::
2
],
states
[
1
::
2
]))
else
:
assert
len
(
states
)
==
state_components
states
=
tuple
([
unstack
(
item
)
for
item
in
states
])
if
not
bidirectional
:
return
list
(
zip
(
*
states
))
else
:
states
=
list
(
zip
(
*
states
))
return
list
(
zip
(
states
[::
2
],
states
[
1
::
2
]))
def
concat_states
(
states
,
bidirectional
=
False
,
state_components
=
1
):
if
state_components
==
1
:
return
np
.
stack
(
flatten
(
states
))
else
:
states
=
flatten
(
states
)
componnets
=
[]
for
i
in
range
(
state_components
):
componnets
.
append
(
states
[
i
::
state_components
])
return
[
np
.
stack
(
item
)
for
item
in
componnets
]
class
RNN
(
LayerMixin
):
def
__init__
(
self
,
cell
,
is_reverse
=
False
,
time_major
=
False
):
super
(
RNN
,
self
).
__init__
()
self
.
cell
=
cell
if
not
hasattr
(
self
.
cell
,
"call"
):
# for non-dygraph mode, `rnn` api uses cell.call
self
.
cell
.
call
=
self
.
cell
.
forward
self
.
is_reverse
=
is_reverse
self
.
time_major
=
time_major
def
forward
(
self
,
inputs
,
initial_states
=
None
,
sequence_length
=
None
):
final_outputs
,
final_states
=
rnn
(
self
.
cell
,
inputs
,
initial_states
=
initial_states
,
sequence_length
=
sequence_length
,
time_major
=
self
.
time_major
,
is_reverse
=
self
.
is_reverse
)
return
final_outputs
,
final_states
class
BiRNN
(
LayerMixin
):
def
__init__
(
self
,
cell_fw
,
cell_bw
,
time_major
=
False
):
super
(
BiRNN
,
self
).
__init__
()
self
.
cell_fw
=
cell_fw
self
.
cell_bw
=
cell_bw
self
.
time_major
=
time_major
def
forward
(
self
,
inputs
,
initial_states
=
None
,
sequence_length
=
None
,
**
kwargs
):
if
isinstance
(
initial_states
,
(
list
,
tuple
)):
assert
len
(
initial_states
)
==
2
,
\
"length of initial_states should be 2 when it is a list/tuple"
else
:
initial_states
=
[
initial_states
,
initial_states
]
outputs
,
final_states
=
birnn
(
self
.
cell_fw
,
self
.
cell_bw
,
inputs
,
initial_states
,
sequence_length
,
self
.
time_major
)
return
outputs
,
final_states
class
RNNMixin
(
LayerListMixin
):
def
forward
(
self
,
inputs
,
initial_states
=
None
,
sequence_length
=
None
):
batch_index
=
1
if
self
.
time_major
else
0
batch_size
=
inputs
.
shape
[
batch_index
]
dtype
=
inputs
.
dtype
if
initial_states
is
None
:
state_shape
=
(
self
.
num_layers
*
self
.
num_directions
,
batch_size
,
self
.
hidden_size
)
if
self
.
state_components
==
1
:
initial_states
=
np
.
zeros
(
state_shape
,
dtype
)
else
:
initial_states
=
tuple
([
np
.
zeros
(
state_shape
,
dtype
)
for
_
in
range
(
self
.
state_components
)
])
states
=
split_states
(
initial_states
,
self
.
num_directions
==
2
,
self
.
state_components
)
final_states
=
[]
for
i
,
rnn_layer
in
enumerate
(
self
):
if
i
>
0
:
inputs
=
dropout
(
inputs
,
self
.
dropout
)
outputs
,
final_state
=
rnn_layer
(
inputs
,
states
[
i
],
sequence_length
)
final_states
.
append
(
final_state
)
inputs
=
outputs
final_states
=
concat_states
(
final_states
,
self
.
num_directions
==
2
,
self
.
state_components
)
return
outputs
,
final_states
class
LSTM
(
RNNMixin
):
def
__init__
(
self
,
input_size
,
hidden_size
,
num_layers
=
1
,
direction
=
"forward"
,
dropout
=
0.
,
time_major
=
False
):
super
(
LSTM
,
self
).
__init__
()
if
direction
in
[
"forward"
,
"backward"
]:
is_reverse
=
direction
==
"backward"
cell
=
LSTMCell
(
input_size
,
hidden_size
)
self
.
append
(
RNN
(
cell
,
is_reverse
,
time_major
))
for
i
in
range
(
1
,
num_layers
):
cell
=
LSTMCell
(
hidden_size
,
hidden_size
)
self
.
append
(
RNN
(
cell
,
is_reverse
,
time_major
))
elif
direction
==
"bidirectional"
:
cell_fw
=
LSTMCell
(
input_size
,
hidden_size
)
cell_bw
=
LSTMCell
(
input_size
,
hidden_size
)
self
.
append
(
BiRNN
(
cell_fw
,
cell_bw
,
time_major
))
for
i
in
range
(
1
,
num_layers
):
cell_fw
=
LSTMCell
(
2
*
hidden_size
,
hidden_size
)
cell_bw
=
LSTMCell
(
2
*
hidden_size
,
hidden_size
)
self
.
append
(
BiRNN
(
cell_fw
,
cell_bw
,
time_major
))
else
:
raise
ValueError
(
"direction should be forward, backward or bidirectional, "
"received direction = {}"
.
format
(
direction
))
self
.
input_size
=
input_size
self
.
hidden_size
=
hidden_size
self
.
dropout
=
dropout
self
.
num_directions
=
2
if
direction
==
"bidirectional"
else
1
self
.
time_major
=
time_major
self
.
num_layers
=
num_layers
self
.
state_components
=
2
@
unittest
.
skipIf
(
not
core
.
is_compiled_with_cuda
(),
"core is not compiled with CUDA"
)
class
TestCUDNNLstmOp
(
OpTest
):
def
get_weight_names
(
self
):
weight_names
=
[]
for
i
in
range
(
2
*
self
.
num_layers
):
weight_names
.
append
(
'weight{}'
.
format
(
i
))
for
i
in
range
(
2
*
self
.
num_layers
):
weight_names
.
append
(
'bias{}'
.
format
(
i
))
return
weight_names
def
setUp
(
self
):
self
.
op_type
=
"cudnn_lstm"
self
.
dtype
=
np
.
float32
if
core
.
is_compiled_with_rocm
()
else
np
.
float64
self
.
sequence_length
=
None
if
core
.
is_compiled_with_rocm
(
)
else
np
.
array
([
12
,
11
,
10
,
9
,
8
],
dtype
=
np
.
int32
)
self
.
num_layers
=
1
self
.
set_attrs
()
seq_length
=
12
batch_size
=
5
input_size
=
21
hidden_size
=
21
input
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_length
,
batch_size
,
input_size
)).
astype
(
self
.
dtype
)
input
[
11
][
1
:][:]
=
0
input
[
10
][
2
:][:]
=
0
input
[
9
][
3
:][:]
=
0
input
[
8
][
4
:][:]
=
0
weight
.
updata_weight
(
hidden_size
,
input_size
,
self
.
dtype
)
rnn1
=
LSTM
(
input_size
,
hidden_size
,
num_layers
=
self
.
num_layers
,
time_major
=
True
,
direction
=
"forward"
)
output
,
(
last_hidden
,
last_cell
)
=
rnn1
(
input
,
sequence_length
=
self
.
sequence_length
)
flat_w
=
[]
num
=
0
for
i
in
range
(
self
.
num_layers
):
if
i
==
0
:
weight_ih
=
weight
.
weight_ih
else
:
weight_ih
=
weight
.
weight_hh
flat_w
.
append
((
"weight"
+
str
(
num
),
weight_ih
))
num
+=
1
for
i
in
range
(
self
.
num_layers
):
weight_hh
=
weight
.
weight_hh
flat_w
.
append
((
"weight"
+
str
(
num
),
weight_hh
))
num
+=
1
num
=
0
for
i
in
range
(
self
.
num_layers
):
bias_ih
=
weight
.
bias_ih
flat_w
.
append
((
"bias"
+
str
(
num
),
bias_ih
))
num
+=
1
for
i
in
range
(
self
.
num_layers
):
bias_hh
=
weight
.
bias_hh
flat_w
.
append
((
"bias"
+
str
(
num
),
bias_hh
))
num
+=
1
init_h
=
np
.
zeros
(
(
self
.
num_layers
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
init_c
=
np
.
zeros
(
(
self
.
num_layers
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
state_out
=
np
.
ndarray
((
300
)).
astype
(
"uint8"
)
if
core
.
is_compiled_with_rocm
():
for
i
in
range
(
len
(
flat_w
)):
w
=
np
.
split
(
flat_w
[
i
][
1
],
4
,
0
)
w
=
[
w
[
0
],
w
[
1
],
w
[
3
],
w
[
2
]]
w
=
np
.
concatenate
(
w
)
flat_w
[
i
]
=
(
flat_w
[
i
][
0
],
w
)
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'InitH'
:
init_h
,
'InitC'
:
init_c
,
'SequenceLength'
:
self
.
sequence_length
}
if
self
.
sequence_length
is
None
:
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'InitH'
:
init_h
,
'InitC'
:
init_c
,
}
self
.
attrs
=
{
'dropout_prob'
:
0.0
,
'is_bidirec'
:
False
,
'input_size'
:
input_size
,
'hidden_size'
:
hidden_size
,
'num_layers'
:
self
.
num_layers
,
}
self
.
outputs
=
{
'Out'
:
output
,
"LastH"
:
last_hidden
,
'LastC'
:
last_cell
,
'Reserve'
:
np
.
ndarray
((
400
)).
astype
(
"uint8"
),
'StateOut'
:
state_out
}
def
set_attrs
(
self
):
pass
def
test_output_with_place
(
self
):
place
=
core
.
CUDAPlace
(
0
)
if
core
.
is_compiled_with_rocm
():
self
.
check_output_with_place
(
place
,
atol
=
1e-5
,
no_check_set
=
[
'Reserve'
,
'StateOut'
])
else
:
self
.
check_output_with_place
(
place
,
no_check_set
=
[
'Reserve'
,
'StateOut'
])
def
test_grad_with_place
(
self
):
place
=
core
.
CUDAPlace
(
0
)
var_name_list
=
self
.
get_weight_names
()
for
var_name
in
var_name_list
:
self
.
check_grad_with_place
(
place
,
set
([
'Input'
,
var_name
,
'InitH'
,
'InitC'
]),
[
'Out'
,
'LastH'
,
'LastC'
])
@
unittest
.
skipIf
(
not
core
.
is_compiled_with_cuda
(),
"core is not compiled with CUDA"
)
class
TestCUDNNlstmAPI
(
unittest
.
TestCase
):
def
test_lstm
(
self
):
seq_len
=
20
batch_size
=
5
hidden_size
=
20
dropout_prob
=
0.0
num_layers
=
1
dtype
=
'float32'
if
core
.
is_compiled_with_rocm
()
else
'float64'
input
=
fluid
.
data
(
name
=
'input'
,
shape
=
[
seq_len
,
batch_size
,
hidden_size
],
dtype
=
dtype
)
init_h
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
init_c
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
rnn_out
,
last_h
,
last_c
=
layers
.
lstm
(
input
,
init_h
,
init_c
,
seq_len
,
hidden_size
,
num_layers
,
dropout_prob
,
False
)
exe
=
fluid
.
Executor
(
fluid
.
CUDAPlace
(
0
))
exe
.
run
(
fluid
.
default_startup_program
())
input_i
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_len
,
batch_size
,
hidden_size
)).
astype
(
"float64"
)
out
=
exe
.
run
(
fluid
.
default_main_program
(),
feed
=
{
'input'
:
input_i
},
fetch_list
=
[
rnn_out
,
last_h
,
last_c
,
'cudnn_lstm_0.w_0'
])
@
unittest
.
skipIf
(
not
core
.
is_compiled_with_cuda
(),
"core is not compiled with CUDA"
)
class
TestCUDNNlstmAPI
(
unittest
.
TestCase
):
def
test_lstm
(
self
):
seq_len
=
20
batch_size
=
5
hidden_size
=
20
dropout_prob
=
0.0
num_layers
=
2
dtype
=
'float32'
if
core
.
is_compiled_with_rocm
()
else
'float64'
input
=
fluid
.
data
(
name
=
'input'
,
shape
=
[
seq_len
,
batch_size
,
hidden_size
],
dtype
=
dtype
)
init_h
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
init_c
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
rnn_out
,
last_h
,
last_c
=
layers
.
lstm
(
input
,
init_h
,
init_c
,
seq_len
,
hidden_size
,
num_layers
,
dropout_prob
,
False
,
True
)
exe
=
fluid
.
Executor
(
fluid
.
CUDAPlace
(
0
))
exe
.
run
(
fluid
.
default_startup_program
())
input_i
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_len
,
batch_size
,
hidden_size
)).
astype
(
dtype
)
out
=
exe
.
run
(
fluid
.
default_main_program
(),
feed
=
{
'input'
:
input_i
},
fetch_list
=
[
rnn_out
,
last_h
,
last_c
,
'cudnn_lstm_0.w_0'
])
if
__name__
==
'__main__'
:
unittest
.
main
()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
math
import
paddle.fluid.core
as
core
from
op_test
import
OpTest
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid.layers
as
layers
import
random
random
.
seed
(
2
)
np
.
set_printoptions
(
threshold
=
np
.
inf
)
paddle
.
enable_static
()
SIGMOID_THRESHOLD_MIN
=
-
40.0
SIGMOID_THRESHOLD_MAX
=
13.0
EXP_MAX_INPUT
=
40.0
class
RandomWeight
:
def
__init__
(
self
):
pass
def
updata_weight
(
self
,
hidden_size
,
input_size
,
dtype
):
std
=
1.0
/
math
.
sqrt
(
hidden_size
)
self
.
hidden_size
=
hidden_size
self
.
input_size
=
input_size
self
.
dtype
=
dtype
self
.
weight_ih
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
,
self
.
input_size
)).
astype
(
dtype
)
self
.
weight_hh
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
,
self
.
hidden_size
)).
astype
(
dtype
)
self
.
bias_ih
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
)).
astype
(
dtype
)
self
.
bias_hh
=
np
.
random
.
uniform
(
low
=-
std
,
high
=
std
,
size
=
(
4
*
self
.
hidden_size
)).
astype
(
dtype
)
weight
=
RandomWeight
()
class
LayerMixin
(
object
):
def
__call__
(
self
,
*
args
,
**
kwargs
):
return
self
.
forward
(
*
args
,
**
kwargs
)
class
LayerListMixin
(
LayerMixin
):
def
__init__
(
self
,
layers
=
None
):
self
.
_layers
=
list
(
layers
)
if
layers
else
[]
def
append
(
self
,
layer
):
self
.
_layers
.
append
(
layer
)
def
__iter__
(
self
):
return
iter
(
self
.
_layers
)
class
LSTMCell
(
LayerMixin
):
def
__init__
(
self
,
input_size
,
hidden_size
,
bias
=
True
):
self
.
input_size
=
input_size
self
.
hidden_size
=
hidden_size
self
.
bias
=
bias
self
.
dtype
=
np
.
float64
self
.
parameters
=
dict
()
self
.
weight_ih
=
weight
.
weight_ih
self
.
weight_hh
=
weight
.
weight_hh
self
.
parameters
[
'weight_ih'
]
=
self
.
weight_ih
self
.
parameters
[
'weight_hh'
]
=
self
.
weight_hh
if
bias
:
self
.
bias_ih
=
weight
.
bias_ih
self
.
bias_hh
=
weight
.
bias_hh
self
.
parameters
[
'bias_ih'
]
=
self
.
bias_ih
self
.
parameters
[
'bias_hh'
]
=
self
.
bias_hh
else
:
self
.
bias_ih
=
None
self
.
bias_hh
=
None
def
init_state
(
self
,
inputs
):
batch_size
=
inputs
.
shape
[
0
]
init_h
=
np
.
zeros
((
batch_size
,
self
.
hidden_size
),
dtype
=
inputs
.
dtype
)
init_c
=
np
.
zeros
((
batch_size
,
self
.
hidden_size
),
dtype
=
inputs
.
dtype
)
return
init_h
,
init_c
def
forward
(
self
,
inputs
,
hx
=
None
):
if
hx
is
None
:
hx
=
self
.
init_state
(
inputs
)
pre_hidden
,
pre_cell
=
hx
gates
=
np
.
matmul
(
inputs
,
self
.
weight_ih
.
T
)
if
self
.
bias_ih
is
not
None
:
gates
=
gates
+
self
.
bias_ih
gates
+=
np
.
matmul
(
pre_hidden
,
self
.
weight_hh
.
T
)
if
self
.
bias_hh
is
not
None
:
gates
=
gates
+
self
.
bias_hh
chunked_gates
=
np
.
split
(
gates
,
4
,
-
1
)
i
=
1.0
/
(
1.0
+
np
.
exp
(
-
chunked_gates
[
0
]))
f
=
1.0
/
(
1.0
+
np
.
exp
(
-
chunked_gates
[
1
]))
o
=
1.0
/
(
1.0
+
np
.
exp
(
-
chunked_gates
[
3
]))
c
=
f
*
pre_cell
+
i
*
np
.
tanh
(
chunked_gates
[
2
])
h
=
o
*
np
.
tanh
(
c
)
return
h
,
(
h
,
c
)
def
sequence_mask
(
lengths
,
max_len
=
None
):
if
max_len
is
None
:
max_len
=
np
.
max
(
lengths
)
else
:
assert
max_len
>=
np
.
max
(
lengths
)
return
np
.
arange
(
max_len
)
<
np
.
expand_dims
(
lengths
,
-
1
)
def
update_state
(
mask
,
new
,
old
):
if
not
isinstance
(
old
,
(
tuple
,
list
)):
return
np
.
where
(
mask
,
new
,
old
)
else
:
return
tuple
(
map
(
lambda
x
,
y
:
np
.
where
(
mask
,
x
,
y
),
new
,
old
))
def
rnn
(
cell
,
inputs
,
initial_states
,
sequence_length
=
None
,
time_major
=
False
,
is_reverse
=
False
):
if
not
time_major
:
inputs
=
np
.
transpose
(
inputs
,
[
1
,
0
,
2
])
if
is_reverse
:
inputs
=
np
.
flip
(
inputs
,
0
)
if
sequence_length
is
None
:
mask
=
None
else
:
mask
=
np
.
transpose
(
sequence_mask
(
sequence_length
),
[
1
,
0
])
mask
=
np
.
expand_dims
(
mask
,
-
1
)
if
is_reverse
:
mask
=
np
.
flip
(
mask
,
0
)
time_steps
=
inputs
.
shape
[
0
]
state
=
initial_states
outputs
=
[]
for
t
in
range
(
time_steps
):
x_t
=
inputs
[
t
]
if
mask
is
not
None
:
m_t
=
mask
[
t
]
y
,
new_state
=
cell
(
x_t
,
state
)
y
=
np
.
where
(
m_t
,
y
,
0.
)
outputs
.
append
(
y
)
state
=
update_state
(
m_t
,
new_state
,
state
)
else
:
y
,
new_state
=
cell
(
x_t
,
state
)
outputs
.
append
(
y
)
state
=
new_state
outputs
=
np
.
stack
(
outputs
)
final_state
=
state
if
is_reverse
:
outputs
=
np
.
flip
(
outputs
,
0
)
if
not
time_major
:
outputs
=
np
.
transpose
(
outputs
,
[
1
,
0
,
2
])
return
outputs
,
final_state
def
birnn
(
cell_fw
,
cell_bw
,
inputs
,
initial_states
,
sequence_length
=
None
,
time_major
=
False
):
states_fw
,
states_bw
=
initial_states
outputs_fw
,
states_fw
=
rnn
(
cell_fw
,
inputs
,
states_fw
,
sequence_length
,
time_major
=
time_major
)
outputs_bw
,
states_bw
=
rnn
(
cell_bw
,
inputs
,
states_bw
,
sequence_length
,
time_major
=
time_major
,
is_reverse
=
True
)
outputs
=
np
.
concatenate
((
outputs_fw
,
outputs_bw
),
-
1
)
final_states
=
(
states_fw
,
states_bw
)
return
outputs
,
final_states
def
flatten
(
nested
):
return
list
(
_flatten
(
nested
))
def
_flatten
(
nested
):
for
item
in
nested
:
if
isinstance
(
item
,
(
list
,
tuple
)):
for
subitem
in
_flatten
(
item
):
yield
subitem
else
:
yield
item
def
unstack
(
array
,
axis
=
0
):
num
=
array
.
shape
[
axis
]
sub_arrays
=
np
.
split
(
array
,
num
,
axis
)
return
[
np
.
squeeze
(
sub_array
,
axis
)
for
sub_array
in
sub_arrays
]
def
dropout
(
array
,
p
=
0.0
):
if
p
==
0.0
:
return
array
mask
=
(
np
.
random
.
uniform
(
size
=
array
.
shape
)
<
(
1
-
p
)).
astype
(
array
.
dtype
)
return
array
*
(
mask
/
(
1
-
p
))
def
split_states
(
states
,
bidirectional
=
False
,
state_components
=
1
):
if
state_components
==
1
:
states
=
unstack
(
states
)
if
not
bidirectional
:
return
states
else
:
return
list
(
zip
(
states
[::
2
],
states
[
1
::
2
]))
else
:
assert
len
(
states
)
==
state_components
states
=
tuple
([
unstack
(
item
)
for
item
in
states
])
if
not
bidirectional
:
return
list
(
zip
(
*
states
))
else
:
states
=
list
(
zip
(
*
states
))
return
list
(
zip
(
states
[::
2
],
states
[
1
::
2
]))
def
concat_states
(
states
,
bidirectional
=
False
,
state_components
=
1
):
if
state_components
==
1
:
return
np
.
stack
(
flatten
(
states
))
else
:
states
=
flatten
(
states
)
componnets
=
[]
for
i
in
range
(
state_components
):
componnets
.
append
(
states
[
i
::
state_components
])
return
[
np
.
stack
(
item
)
for
item
in
componnets
]
class
RNN
(
LayerMixin
):
def
__init__
(
self
,
cell
,
is_reverse
=
False
,
time_major
=
False
):
super
(
RNN
,
self
).
__init__
()
self
.
cell
=
cell
if
not
hasattr
(
self
.
cell
,
"call"
):
# for non-dygraph mode, `rnn` api uses cell.call
self
.
cell
.
call
=
self
.
cell
.
forward
self
.
is_reverse
=
is_reverse
self
.
time_major
=
time_major
def
forward
(
self
,
inputs
,
initial_states
=
None
,
sequence_length
=
None
):
final_outputs
,
final_states
=
rnn
(
self
.
cell
,
inputs
,
initial_states
=
initial_states
,
sequence_length
=
sequence_length
,
time_major
=
self
.
time_major
,
is_reverse
=
self
.
is_reverse
)
return
final_outputs
,
final_states
class
BiRNN
(
LayerMixin
):
def
__init__
(
self
,
cell_fw
,
cell_bw
,
time_major
=
False
):
super
(
BiRNN
,
self
).
__init__
()
self
.
cell_fw
=
cell_fw
self
.
cell_bw
=
cell_bw
self
.
time_major
=
time_major
def
forward
(
self
,
inputs
,
initial_states
=
None
,
sequence_length
=
None
,
**
kwargs
):
if
isinstance
(
initial_states
,
(
list
,
tuple
)):
assert
len
(
initial_states
)
==
2
,
\
"length of initial_states should be 2 when it is a list/tuple"
else
:
initial_states
=
[
initial_states
,
initial_states
]
outputs
,
final_states
=
birnn
(
self
.
cell_fw
,
self
.
cell_bw
,
inputs
,
initial_states
,
sequence_length
,
self
.
time_major
)
return
outputs
,
final_states
class
RNNMixin
(
LayerListMixin
):
def
forward
(
self
,
inputs
,
initial_states
=
None
,
sequence_length
=
None
):
batch_index
=
1
if
self
.
time_major
else
0
batch_size
=
inputs
.
shape
[
batch_index
]
dtype
=
inputs
.
dtype
if
initial_states
is
None
:
state_shape
=
(
self
.
num_layers
*
self
.
num_directions
,
batch_size
,
self
.
hidden_size
)
if
self
.
state_components
==
1
:
initial_states
=
np
.
zeros
(
state_shape
,
dtype
)
else
:
initial_states
=
tuple
([
np
.
zeros
(
state_shape
,
dtype
)
for
_
in
range
(
self
.
state_components
)
])
states
=
split_states
(
initial_states
,
self
.
num_directions
==
2
,
self
.
state_components
)
final_states
=
[]
for
i
,
rnn_layer
in
enumerate
(
self
):
if
i
>
0
:
inputs
=
dropout
(
inputs
,
self
.
dropout
)
outputs
,
final_state
=
rnn_layer
(
inputs
,
states
[
i
],
sequence_length
)
final_states
.
append
(
final_state
)
inputs
=
outputs
final_states
=
concat_states
(
final_states
,
self
.
num_directions
==
2
,
self
.
state_components
)
return
outputs
,
final_states
class
LSTM
(
RNNMixin
):
def
__init__
(
self
,
input_size
,
hidden_size
,
num_layers
=
1
,
direction
=
"forward"
,
dropout
=
0.
,
time_major
=
False
):
super
(
LSTM
,
self
).
__init__
()
if
direction
in
[
"forward"
,
"backward"
]:
is_reverse
=
direction
==
"backward"
cell
=
LSTMCell
(
input_size
,
hidden_size
)
self
.
append
(
RNN
(
cell
,
is_reverse
,
time_major
))
for
i
in
range
(
1
,
num_layers
):
cell
=
LSTMCell
(
hidden_size
,
hidden_size
)
self
.
append
(
RNN
(
cell
,
is_reverse
,
time_major
))
elif
direction
==
"bidirectional"
:
cell_fw
=
LSTMCell
(
input_size
,
hidden_size
)
cell_bw
=
LSTMCell
(
input_size
,
hidden_size
)
self
.
append
(
BiRNN
(
cell_fw
,
cell_bw
,
time_major
))
for
i
in
range
(
1
,
num_layers
):
cell_fw
=
LSTMCell
(
2
*
hidden_size
,
hidden_size
)
cell_bw
=
LSTMCell
(
2
*
hidden_size
,
hidden_size
)
self
.
append
(
BiRNN
(
cell_fw
,
cell_bw
,
time_major
))
else
:
raise
ValueError
(
"direction should be forward, backward or bidirectional, "
"received direction = {}"
.
format
(
direction
))
self
.
input_size
=
input_size
self
.
hidden_size
=
hidden_size
self
.
dropout
=
dropout
self
.
num_directions
=
2
if
direction
==
"bidirectional"
else
1
self
.
time_major
=
time_major
self
.
num_layers
=
num_layers
self
.
state_components
=
2
@
unittest
.
skipIf
(
not
core
.
is_compiled_with_cuda
(),
"core is not compiled with CUDA"
)
class
TestCUDNNLstmOp
(
OpTest
):
def
get_weight_names
(
self
):
weight_names
=
[]
for
i
in
range
(
2
*
self
.
num_layers
):
weight_names
.
append
(
'weight{}'
.
format
(
i
))
for
i
in
range
(
2
*
self
.
num_layers
):
weight_names
.
append
(
'bias{}'
.
format
(
i
))
return
weight_names
def
setUp
(
self
):
self
.
op_type
=
"cudnn_lstm"
self
.
dtype
=
np
.
float32
if
core
.
is_compiled_with_rocm
()
else
np
.
float64
self
.
sequence_length
=
None
if
core
.
is_compiled_with_rocm
(
)
else
np
.
array
([
12
,
11
,
10
,
9
,
8
],
dtype
=
np
.
int32
)
self
.
num_layers
=
1
self
.
set_attrs
()
seq_length
=
12
batch_size
=
5
input_size
=
21
hidden_size
=
21
input
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_length
,
batch_size
,
input_size
)).
astype
(
self
.
dtype
)
input
[
11
][
1
:][:]
=
0
input
[
10
][
2
:][:]
=
0
input
[
9
][
3
:][:]
=
0
input
[
8
][
4
:][:]
=
0
weight
.
updata_weight
(
hidden_size
,
input_size
,
self
.
dtype
)
rnn1
=
LSTM
(
input_size
,
hidden_size
,
num_layers
=
self
.
num_layers
,
time_major
=
True
,
direction
=
"forward"
)
output
,
(
last_hidden
,
last_cell
)
=
rnn1
(
input
,
sequence_length
=
self
.
sequence_length
)
flat_w
=
[]
num
=
0
for
i
in
range
(
self
.
num_layers
):
if
i
==
0
:
weight_ih
=
weight
.
weight_ih
else
:
weight_ih
=
weight
.
weight_hh
flat_w
.
append
((
"weight"
+
str
(
num
),
weight_ih
))
num
+=
1
for
i
in
range
(
self
.
num_layers
):
weight_hh
=
weight
.
weight_hh
flat_w
.
append
((
"weight"
+
str
(
num
),
weight_hh
))
num
+=
1
num
=
0
for
i
in
range
(
self
.
num_layers
):
bias_ih
=
weight
.
bias_ih
flat_w
.
append
((
"bias"
+
str
(
num
),
bias_ih
))
num
+=
1
for
i
in
range
(
self
.
num_layers
):
bias_hh
=
weight
.
bias_hh
flat_w
.
append
((
"bias"
+
str
(
num
),
bias_hh
))
num
+=
1
init_h
=
np
.
zeros
(
(
self
.
num_layers
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
init_c
=
np
.
zeros
(
(
self
.
num_layers
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
state_out
=
np
.
ndarray
((
300
)).
astype
(
"uint8"
)
if
core
.
is_compiled_with_rocm
():
for
i
in
range
(
len
(
flat_w
)):
w
=
np
.
split
(
flat_w
[
i
][
1
],
4
,
0
)
w
=
[
w
[
0
],
w
[
1
],
w
[
3
],
w
[
2
]]
w
=
np
.
concatenate
(
w
)
flat_w
[
i
]
=
(
flat_w
[
i
][
0
],
w
)
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'InitH'
:
init_h
,
'InitC'
:
init_c
,
'SequenceLength'
:
self
.
sequence_length
}
if
self
.
sequence_length
is
None
:
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'InitH'
:
init_h
,
'InitC'
:
init_c
,
}
self
.
attrs
=
{
'dropout_prob'
:
0.0
,
'is_bidirec'
:
False
,
'input_size'
:
input_size
,
'hidden_size'
:
hidden_size
,
'num_layers'
:
self
.
num_layers
,
}
self
.
outputs
=
{
'Out'
:
output
,
"LastH"
:
last_hidden
,
'LastC'
:
last_cell
,
'Reserve'
:
np
.
ndarray
((
400
)).
astype
(
"uint8"
),
'StateOut'
:
state_out
}
def
set_attrs
(
self
):
pass
def
test_output_with_place
(
self
):
place
=
core
.
CUDAPlace
(
0
)
if
core
.
is_compiled_with_rocm
():
self
.
check_output_with_place
(
place
,
atol
=
1e-5
,
no_check_set
=
[
'Reserve'
,
'StateOut'
])
else
:
self
.
check_output_with_place
(
place
,
no_check_set
=
[
'Reserve'
,
'StateOut'
])
def
test_grad_with_place
(
self
):
place
=
core
.
CUDAPlace
(
0
)
var_name_list
=
self
.
get_weight_names
()
for
var_name
in
var_name_list
:
self
.
check_grad_with_place
(
place
,
set
([
'Input'
,
var_name
,
'InitH'
,
'InitC'
]),
[
'Out'
,
'LastH'
,
'LastC'
])
@
unittest
.
skipIf
(
not
core
.
is_compiled_with_cuda
(),
"core is not compiled with CUDA"
)
class
TestCUDNNlstmAPI
(
unittest
.
TestCase
):
def
test_lstm
(
self
):
seq_len
=
20
batch_size
=
5
hidden_size
=
20
dropout_prob
=
0.0
num_layers
=
1
dtype
=
'float32'
if
core
.
is_compiled_with_rocm
()
else
'float64'
input
=
fluid
.
data
(
name
=
'input'
,
shape
=
[
seq_len
,
batch_size
,
hidden_size
],
dtype
=
dtype
)
init_h
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
init_c
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
rnn_out
,
last_h
,
last_c
=
layers
.
lstm
(
input
,
init_h
,
init_c
,
seq_len
,
hidden_size
,
num_layers
,
dropout_prob
,
False
)
exe
=
fluid
.
Executor
(
fluid
.
CUDAPlace
(
0
))
exe
.
run
(
fluid
.
default_startup_program
())
input_i
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_len
,
batch_size
,
hidden_size
)).
astype
(
"float64"
)
out
=
exe
.
run
(
fluid
.
default_main_program
(),
feed
=
{
'input'
:
input_i
},
fetch_list
=
[
rnn_out
,
last_h
,
last_c
,
'cudnn_lstm_0.w_0'
])
@
unittest
.
skipIf
(
not
core
.
is_compiled_with_cuda
(),
"core is not compiled with CUDA"
)
class
TestCUDNNlstmAPI
(
unittest
.
TestCase
):
def
test_lstm
(
self
):
seq_len
=
20
batch_size
=
5
hidden_size
=
20
dropout_prob
=
0.0
num_layers
=
2
dtype
=
'float32'
if
core
.
is_compiled_with_rocm
()
else
'float64'
input
=
fluid
.
data
(
name
=
'input'
,
shape
=
[
seq_len
,
batch_size
,
hidden_size
],
dtype
=
dtype
)
init_h
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
init_c
=
layers
.
fill_constant
([
num_layers
,
batch_size
,
hidden_size
],
dtype
,
0.0
)
rnn_out
,
last_h
,
last_c
=
layers
.
lstm
(
input
,
init_h
,
init_c
,
seq_len
,
hidden_size
,
num_layers
,
dropout_prob
,
False
,
True
)
exe
=
fluid
.
Executor
(
fluid
.
CUDAPlace
(
0
))
exe
.
run
(
fluid
.
default_startup_program
())
input_i
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_len
,
batch_size
,
hidden_size
)).
astype
(
dtype
)
out
=
exe
.
run
(
fluid
.
default_main_program
(),
feed
=
{
'input'
:
input_i
},
fetch_list
=
[
rnn_out
,
last_h
,
last_c
,
'cudnn_lstm_0.w_0'
])
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_rnn_op.py
浏览文件 @
c6c9c186
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
math
import
paddle.fluid.core
as
core
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid.layers
as
layers
import
random
import
sys
from
op_test
import
OpTest
sys
.
path
.
append
(
"./rnn"
)
from
rnn_numpy
import
SimpleRNN
,
LSTM
,
GRU
from
convert
import
get_params_for_net
random
.
seed
(
2
)
np
.
set_printoptions
(
threshold
=
np
.
inf
)
paddle
.
enable_static
()
class
TestRNNOp
(
OpTest
):
def
get_weight_names
(
self
):
weight_names
=
[]
for
i
in
range
(
self
.
num_layers
):
for
j
in
range
(
0
,
2
*
self
.
direction_num
):
weight_names
.
append
(
"{}.weight_{}"
.
format
(
i
,
j
))
for
i
in
range
(
self
.
num_layers
):
for
j
in
range
(
0
,
2
*
self
.
direction_num
):
weight_names
.
append
(
"{}.bias_{}"
.
format
(
i
,
j
))
return
weight_names
def
setUp
(
self
):
self
.
op_type
=
"rnn"
self
.
dtype
=
np
.
float32
if
core
.
is_compiled_with_rocm
()
else
np
.
float64
self
.
sequence_length
=
None
if
core
.
is_compiled_with_rocm
(
)
else
np
.
array
([
12
,
11
,
10
,
9
,
8
],
dtype
=
np
.
int32
)
self
.
num_layers
=
1
self
.
is_bidirec
=
False
self
.
mode
=
"LSTM"
self
.
is_test
=
False
self
.
dropout
=
0.0
self
.
set_attrs
()
self
.
direction_num
=
2
if
self
.
is_bidirec
else
1
direction
=
"bidirectional"
if
self
.
is_bidirec
else
"forward"
seq_length
=
12
batch_size
=
5
input_size
=
3
hidden_size
=
2
input
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_length
,
batch_size
,
input_size
)).
astype
(
self
.
dtype
)
if
self
.
sequence_length
is
not
None
:
input
[
11
][
1
:][:]
=
0
input
[
10
][
2
:][:]
=
0
input
[
9
][
3
:][:]
=
0
input
[
8
][
4
:][:]
=
0
rnn1
=
LSTM
(
input_size
,
hidden_size
,
num_layers
=
self
.
num_layers
,
time_major
=
True
,
direction
=
direction
,
dropout
=
self
.
dropout
,
dtype
=
self
.
dtype
)
flat_w
=
get_params_for_net
(
rnn1
)
output
,
(
last_hidden
,
last_cell
)
=
rnn1
(
input
,
sequence_length
=
self
.
sequence_length
)
if
core
.
is_compiled_with_rocm
():
def
rocm_rnn_get_place
():
places
=
[
core
.
CUDAPlace
(
0
)]
return
places
self
.
_get_places
=
rocm_rnn_get_place
init_h
=
np
.
zeros
((
self
.
num_layers
*
self
.
direction_num
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
init_c
=
np
.
zeros
((
self
.
num_layers
*
self
.
direction_num
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
state_out
=
np
.
ndarray
((
300
)).
astype
(
"uint8"
)
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'PreState'
:
[(
'init_h'
,
init_h
),
(
'init_c'
,
init_c
)],
'SequenceLength'
:
self
.
sequence_length
}
if
self
.
sequence_length
is
None
:
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'PreState'
:
[(
'init_h'
,
init_h
),
(
'init_c'
,
init_c
)],
}
self
.
attrs
=
{
'dropout_prob'
:
self
.
dropout
,
'is_bidirec'
:
self
.
is_bidirec
,
'input_size'
:
input_size
,
'hidden_size'
:
hidden_size
,
'num_layers'
:
self
.
num_layers
,
'mode'
:
self
.
mode
,
'is_test'
:
self
.
is_test
}
self
.
outputs
=
{
'Out'
:
output
,
"State"
:
[(
'last_hidden'
,
last_hidden
),
(
'last_cell'
,
last_cell
)],
'Reserve'
:
np
.
ndarray
((
400
)).
astype
(
"uint8"
),
'DropoutState'
:
state_out
}
def
test_output
(
self
):
self
.
check_output
(
no_check_set
=
[
'Reserve'
,
'DropoutState'
])
def
set_attrs
(
self
):
pass
def
test_grad
(
self
):
if
not
self
.
is_test
:
var_name_list
=
self
.
get_weight_names
()
grad_check_list
=
[
'Input'
,
'init_h'
,
'init_c'
]
grad_check_list
.
extend
(
var_name_list
)
self
.
check_grad
(
set
(
grad_check_list
),
[
'Out'
,
'last_hidden'
,
'last_cell'
])
class
TestRNNOp1
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
sequence_length
=
None
class
TestRNNOp2
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
sequence_length
=
None
self
.
is_bidirec
=
True
class
TestRNNOp3
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
is_test
=
True
self
.
sequence_length
=
None
class
TestRNNOp4
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
is_test
=
True
self
.
sequence_length
=
None
self
.
is_bidirec
=
True
class
TestRNNOp5
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
class
TestRNNOp6
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
self
.
is_bidirec
=
True
class
TestRNNOp7
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
self
.
is_bidirec
=
True
self
.
is_test
=
True
class
TestRNNOp8
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
self
.
is_bidirec
=
True
self
.
sequence_length
=
None
class
TestRNNOp9
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
3
if
__name__
==
'__main__'
:
unittest
.
main
()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
unittest
import
numpy
as
np
import
math
import
paddle.fluid.core
as
core
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid.layers
as
layers
import
random
import
sys
from
op_test
import
OpTest
sys
.
path
.
append
(
"./rnn"
)
from
rnn_numpy
import
SimpleRNN
,
LSTM
,
GRU
from
convert
import
get_params_for_net
random
.
seed
(
2
)
np
.
set_printoptions
(
threshold
=
np
.
inf
)
paddle
.
enable_static
()
class
TestRNNOp
(
OpTest
):
def
get_weight_names
(
self
):
weight_names
=
[]
for
i
in
range
(
self
.
num_layers
):
for
j
in
range
(
0
,
2
*
self
.
direction_num
):
weight_names
.
append
(
"{}.weight_{}"
.
format
(
i
,
j
))
for
i
in
range
(
self
.
num_layers
):
for
j
in
range
(
0
,
2
*
self
.
direction_num
):
weight_names
.
append
(
"{}.bias_{}"
.
format
(
i
,
j
))
return
weight_names
def
setUp
(
self
):
self
.
op_type
=
"rnn"
self
.
dtype
=
np
.
float32
if
core
.
is_compiled_with_rocm
()
else
np
.
float64
self
.
sequence_length
=
None
if
core
.
is_compiled_with_rocm
(
)
else
np
.
array
([
12
,
11
,
10
,
9
,
8
],
dtype
=
np
.
int32
)
self
.
num_layers
=
1
self
.
is_bidirec
=
False
self
.
mode
=
"LSTM"
self
.
is_test
=
False
self
.
dropout
=
0.0
self
.
set_attrs
()
self
.
direction_num
=
2
if
self
.
is_bidirec
else
1
direction
=
"bidirectional"
if
self
.
is_bidirec
else
"forward"
seq_length
=
12
batch_size
=
5
input_size
=
3
hidden_size
=
2
input
=
np
.
random
.
uniform
(
low
=-
0.1
,
high
=
0.1
,
size
=
(
seq_length
,
batch_size
,
input_size
)).
astype
(
self
.
dtype
)
if
self
.
sequence_length
is
not
None
:
input
[
11
][
1
:][:]
=
0
input
[
10
][
2
:][:]
=
0
input
[
9
][
3
:][:]
=
0
input
[
8
][
4
:][:]
=
0
rnn1
=
LSTM
(
input_size
,
hidden_size
,
num_layers
=
self
.
num_layers
,
time_major
=
True
,
direction
=
direction
,
dropout
=
self
.
dropout
,
dtype
=
self
.
dtype
)
flat_w
=
get_params_for_net
(
rnn1
)
output
,
(
last_hidden
,
last_cell
)
=
rnn1
(
input
,
sequence_length
=
self
.
sequence_length
)
if
core
.
is_compiled_with_rocm
():
def
rocm_rnn_get_place
():
places
=
[
core
.
CUDAPlace
(
0
)]
return
places
self
.
_get_places
=
rocm_rnn_get_place
init_h
=
np
.
zeros
((
self
.
num_layers
*
self
.
direction_num
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
init_c
=
np
.
zeros
((
self
.
num_layers
*
self
.
direction_num
,
batch_size
,
hidden_size
)).
astype
(
self
.
dtype
)
state_out
=
np
.
ndarray
((
300
)).
astype
(
"uint8"
)
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'PreState'
:
[(
'init_h'
,
init_h
),
(
'init_c'
,
init_c
)],
'SequenceLength'
:
self
.
sequence_length
}
if
self
.
sequence_length
is
None
:
self
.
inputs
=
{
'Input'
:
input
,
'WeightList'
:
flat_w
,
'PreState'
:
[(
'init_h'
,
init_h
),
(
'init_c'
,
init_c
)],
}
self
.
attrs
=
{
'dropout_prob'
:
self
.
dropout
,
'is_bidirec'
:
self
.
is_bidirec
,
'input_size'
:
input_size
,
'hidden_size'
:
hidden_size
,
'num_layers'
:
self
.
num_layers
,
'mode'
:
self
.
mode
,
'is_test'
:
self
.
is_test
}
self
.
outputs
=
{
'Out'
:
output
,
"State"
:
[(
'last_hidden'
,
last_hidden
),
(
'last_cell'
,
last_cell
)],
'Reserve'
:
np
.
ndarray
((
400
)).
astype
(
"uint8"
),
'DropoutState'
:
state_out
}
def
test_output
(
self
):
self
.
check_output
(
no_check_set
=
[
'Reserve'
,
'DropoutState'
])
def
set_attrs
(
self
):
pass
def
test_grad
(
self
):
if
not
self
.
is_test
:
var_name_list
=
self
.
get_weight_names
()
grad_check_list
=
[
'Input'
,
'init_h'
,
'init_c'
]
grad_check_list
.
extend
(
var_name_list
)
self
.
check_grad
(
set
(
grad_check_list
),
[
'Out'
,
'last_hidden'
,
'last_cell'
])
class
TestRNNOp1
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
sequence_length
=
None
class
TestRNNOp2
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
sequence_length
=
None
self
.
is_bidirec
=
True
class
TestRNNOp3
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
is_test
=
True
self
.
sequence_length
=
None
class
TestRNNOp4
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
is_test
=
True
self
.
sequence_length
=
None
self
.
is_bidirec
=
True
class
TestRNNOp5
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
class
TestRNNOp6
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
self
.
is_bidirec
=
True
class
TestRNNOp7
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
self
.
is_bidirec
=
True
self
.
is_test
=
True
class
TestRNNOp8
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
2
self
.
is_bidirec
=
True
self
.
sequence_length
=
None
class
TestRNNOp9
(
TestRNNOp
):
def
set_attrs
(
self
):
self
.
num_layers
=
3
if
__name__
==
'__main__'
:
unittest
.
main
()
python/paddle/fluid/transpiler/memory_optimization_transpiler.py
浏览文件 @
c6c9c186
...
...
@@ -21,7 +21,7 @@ def memory_optimize(input_program,
level
=
0
,
skip_grads
=
True
):
"""
:api_attr: Static Graph
:api_attr: Static Graph
This API is deprecated since 1.6. Please do not use it. The better
memory optimization strategies are enabled by default.
...
...
@@ -43,7 +43,7 @@ def memory_optimize(input_program,
def
release_memory
(
input_program
,
skip_opt_set
=
None
):
"""
:api_attr: Static Graph
:api_attr: Static Graph
This API is deprecated since 1.6. Please do not use it. The better
memory optimization strategies are enabled by default.
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录