test_async_read_write.py 6.2 KB
Newer Older
1
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
2
#
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9 10 11 12 13 14 15
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
16

17 18 19 20
import numpy as np

import paddle
from paddle.device import cuda
21 22
from paddle.fluid import core
from paddle.fluid.framework import _in_legacy_dygraph, _test_eager_guard
23 24 25


class TestAsyncRead(unittest.TestCase):
W
wanghuancoder 已提交
26
    def func_setUp(self):
27 28 29
        self.empty = paddle.to_tensor(
            np.array([], dtype="int64"), place=paddle.CPUPlace()
        )
30 31 32
        data = np.random.randn(100, 50, 50).astype("float32")
        self.src = paddle.to_tensor(data, place=paddle.CUDAPinnedPlace())
        self.dst = paddle.empty(shape=[100, 50, 50], dtype="float32")
33 34 35 36 37 38
        self.index = paddle.to_tensor(
            np.array([1, 3, 5, 7, 9], dtype="int64")
        ).cpu()
        self.buffer = paddle.empty(
            shape=[50, 50, 50], dtype="float32"
        ).pin_memory()
39 40
        self.stream = cuda.Stream()

W
wanghuancoder 已提交
41
    def func_test_async_read_empty_offset_and_count(self):
42
        with cuda.stream_guard(self.stream):
W
wanghuancoder 已提交
43
            if _in_legacy_dygraph():
44 45 46 47 48 49 50 51
                core.async_read(
                    self.src,
                    self.dst,
                    self.index,
                    self.buffer,
                    self.empty,
                    self.empty,
                )
W
wanghuancoder 已提交
52
            else:
53 54 55 56 57 58 59 60
                core.eager.async_read(
                    self.src,
                    self.dst,
                    self.index,
                    self.buffer,
                    self.empty,
                    self.empty,
                )
61
        array1 = paddle.gather(self.src, self.index)
62
        array2 = self.dst[: len(self.index)]
63

64
        np.testing.assert_allclose(array1.numpy(), array2.numpy(), rtol=1e-05)
65

W
wanghuancoder 已提交
66
    def func_test_async_read_success(self):
67 68 69 70 71 72
        offset = paddle.to_tensor(
            np.array([10, 20], dtype="int64"), place=paddle.CPUPlace()
        )
        count = paddle.to_tensor(
            np.array([5, 10], dtype="int64"), place=paddle.CPUPlace()
        )
73
        with cuda.stream_guard(self.stream):
W
wanghuancoder 已提交
74
            if _in_legacy_dygraph():
75 76 77
                core.async_read(
                    self.src, self.dst, self.index, self.buffer, offset, count
                )
W
wanghuancoder 已提交
78
            else:
79 80 81
                core.eager.async_read(
                    self.src, self.dst, self.index, self.buffer, offset, count
                )
82 83 84
        # index data
        index_array1 = paddle.gather(self.src, self.index)
        count_numel = paddle.sum(count).numpy()[0]
85 86 87 88
        index_array2 = self.dst[count_numel : count_numel + len(self.index)]
        np.testing.assert_allclose(
            index_array1.numpy(), index_array2.numpy(), rtol=1e-05
        )
89 90 91 92 93 94

        # offset, count
        offset_a = paddle.gather(self.src, paddle.to_tensor(np.arange(10, 15)))
        offset_b = paddle.gather(self.src, paddle.to_tensor(np.arange(20, 30)))
        offset_array1 = paddle.concat([offset_a, offset_b], axis=0)
        offset_array2 = self.dst[:count_numel]
95 96 97
        np.testing.assert_allclose(
            offset_array1.numpy(), offset_array2.numpy(), rtol=1e-05
        )
98

W
wanghuancoder 已提交
99
    def func_test_async_read_only_1dim(self):
100 101 102 103
        src = paddle.rand([40], dtype="float32").pin_memory()
        dst = paddle.empty([40], dtype="float32")
        buffer_ = paddle.empty([20]).pin_memory()
        with cuda.stream_guard(self.stream):
W
wanghuancoder 已提交
104
            if _in_legacy_dygraph():
105 106 107
                core.async_read(
                    src, dst, self.index, buffer_, self.empty, self.empty
                )
W
wanghuancoder 已提交
108
            else:
109 110 111
                core.eager.async_read(
                    src, dst, self.index, buffer_, self.empty, self.empty
                )
112
        array1 = paddle.gather(src, self.index)
113
        array2 = dst[: len(self.index)]
114
        np.testing.assert_allclose(array1.numpy(), array2.numpy(), rtol=1e-05)
115

W
wanghuancoder 已提交
116 117 118 119
    def test_main(self):
        with _test_eager_guard():
            self.func_setUp()
            self.func_test_async_read_empty_offset_and_count()
120
            self.func_setUp()
W
wanghuancoder 已提交
121
            self.func_test_async_read_success()
122
            self.func_setUp()
W
wanghuancoder 已提交
123 124 125 126 127 128 129 130
            self.func_test_async_read_only_1dim()
        self.func_setUp()
        self.func_test_async_read_empty_offset_and_count()
        self.func_setUp()
        self.func_test_async_read_success()
        self.func_setUp()
        self.func_test_async_read_only_1dim()

131 132

class TestAsyncWrite(unittest.TestCase):
W
wanghuancoder 已提交
133
    def func_setUp(self):
134
        self.src = paddle.rand(shape=[100, 50, 50, 5], dtype="float32")
135 136 137
        self.dst = paddle.empty(
            shape=[200, 50, 50, 5], dtype="float32"
        ).pin_memory()
138 139
        self.stream = cuda.Stream()

W
wanghuancoder 已提交
140
    def func_test_async_write_success(self):
141 142 143 144 145 146
        offset = paddle.to_tensor(
            np.array([0, 60], dtype="int64"), place=paddle.CPUPlace()
        )
        count = paddle.to_tensor(
            np.array([40, 60], dtype="int64"), place=paddle.CPUPlace()
        )
147
        with cuda.stream_guard(self.stream):
W
wanghuancoder 已提交
148 149 150 151
            if _in_legacy_dygraph():
                core.async_write(self.src, self.dst, offset, count)
            else:
                core.eager.async_write(self.src, self.dst, offset, count)
152 153 154 155

        offset_a = paddle.gather(self.dst, paddle.to_tensor(np.arange(0, 40)))
        offset_b = paddle.gather(self.dst, paddle.to_tensor(np.arange(60, 120)))
        offset_array = paddle.concat([offset_a, offset_b], axis=0)
156 157 158
        np.testing.assert_allclose(
            self.src.numpy(), offset_array.numpy(), rtol=1e-05
        )
159

W
wanghuancoder 已提交
160 161 162 163 164 165 166
    def test_async_write_success(self):
        with _test_eager_guard():
            self.func_setUp()
            self.func_test_async_write_success()
        self.func_setUp()
        self.func_test_async_write_success()

167 168 169 170

if __name__ == "__main__":
    if core.is_compiled_with_cuda():
        unittest.main()